Skip to content

Commit

Permalink
Set staging bucket to be deleted a specified number of days after del…
Browse files Browse the repository at this point in the history
…etion of cluster (#792)

* works?

* Well that was silly

* lets see

* debugging

* what

* catching up

* one more unit test to go

* wat

* unit tests are good

* cleaning up PR

* no magic numbers

* some cleaning up

* well that was silly

* comment update
  • Loading branch information
akarukappadath committed Mar 9, 2019
1 parent e352353 commit 9b36870
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<appender-ref ref="console"/>
</logger>
-->

<!--
<logger name="slick.jdbc.JdbcBackend.benchmark" level="DEBUG" additivity="false">
<appender-ref ref="file"/>
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,8 @@ zombieClusterMonitor {
leoExecutionMode {
backLeo = true
}

clusterBucket {
# number of days the staging bucket should continue to exist after a cluster is deleted
stagingBucketExpiration = 10 days
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.broadinstitute.dsde.workbench.google.GoogleCredentialModes.{Pem, Toke
import org.broadinstitute.dsde.workbench.google.{GoogleStorageDAO, HttpGoogleIamDAO, HttpGoogleProjectDAO, HttpGoogleStorageDAO}
import org.broadinstitute.dsde.workbench.leonardo.api.{LeoRoutes, StandardUserInfoDirectives}
import org.broadinstitute.dsde.workbench.leonardo.auth.{LeoAuthProviderHelper, ServiceAccountProviderHelper}
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, ClusterDefaultsConfig, ClusterDnsCacheConfig, ClusterFilesConfig, ClusterResourcesConfig, DataprocConfig, LeoExecutionModeConfig, MonitorConfig, ProxyConfig, SamConfig, SwaggerConfig, ZombieClusterConfig}
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, ClusterDefaultsConfig, ClusterDnsCacheConfig, ClusterFilesConfig, ClusterResourcesConfig, DataprocConfig, LeoExecutionModeConfig, MonitorConfig, ProxyConfig, SamConfig, SwaggerConfig, ZombieClusterConfig, ClusterBucketConfig}
import org.broadinstitute.dsde.workbench.leonardo.dao.{HttpJupyterDAO, HttpSamDAO}
import org.broadinstitute.dsde.workbench.leonardo.dao.google.{HttpGoogleComputeDAO, HttpGoogleDataprocDAO}
import org.broadinstitute.dsde.workbench.leonardo.db.DbReference
Expand Down Expand Up @@ -51,6 +51,7 @@ object Boot extends App with LazyLogging {
val zombieClusterMonitorConfig = config.as[ZombieClusterConfig]("zombieClusterMonitor")
val clusterDnsCacheConfig = config.as[ClusterDnsCacheConfig]("clusterDnsCache")
val leoExecutionModeConfig = config.as[LeoExecutionModeConfig]("leoExecutionMode")
val clusterBucketConfig = config.as[ClusterBucketConfig]("clusterBucket")

// we need an ActorSystem to host our application in
implicit val system = ActorSystem("leonardo")
Expand Down Expand Up @@ -86,7 +87,7 @@ object Boot extends App with LazyLogging {
if(leoExecutionModeConfig.backLeo) {
val googleProjectDAO = new HttpGoogleProjectDAO(dataprocConfig.applicationName, Pem(leoServiceAccountEmail, leoServiceAccountPemFile), "google")
val jupyterDAO = new HttpJupyterDAO(clusterDnsCache)
val clusterMonitorSupervisor = system.actorOf(ClusterMonitorSupervisor.props(monitorConfig, dataprocConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, dbRef, authProvider, autoFreezeConfig, jupyterDAO, leonardoService))
val clusterMonitorSupervisor = system.actorOf(ClusterMonitorSupervisor.props(monitorConfig, dataprocConfig, clusterBucketConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, dbRef, authProvider, autoFreezeConfig, jupyterDAO, leonardoService))
val zombieClusterMonitor = system.actorOf(ZombieClusterMonitor.props(zombieClusterMonitorConfig, gdDAO, googleProjectDAO, dbRef))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.broadinstitute.dsde.workbench.leonardo.config

import scala.concurrent.duration.FiniteDuration

case class ClusterBucketConfig(
stagingBucketExpiration: FiniteDuration
)
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,10 @@ package object config {
config.getBoolean("backLeo")
)
}

implicit val clusterBucketConfig: ValueReader[ClusterBucketConfig] = ValueReader.relative { config =>
ClusterBucketConfig(
toScalaDuration(config.getDuration("stagingBucketExpiration"))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,18 @@ trait ClusterComponent extends LeoComponent {
.map { recs => recs.headOption.flatten.flatMap(head => parseGcsPath(head).toOption) }
}

def getStagingBucket(project: GoogleProject, name: ClusterName): DBIO[Option[GcsPath]] = {

clusterQuery
.filter { _.googleProject === project.value }
.filter { _.clusterName === name.value }
.map(_.stagingBucket)
.result
// staging bucket is saved as a bucket name rather than a path
.map { recs => recs.headOption.flatten.flatMap(head => parseGcsPath("gs://" + head + "/").toOption)
}
}

def getServiceAccountKeyId(project: GoogleProject, name: ClusterName): DBIO[Option[ServiceAccountKeyId]] = {
clusterQuery
.filter { _.googleProject === project.value }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package org.broadinstitute.dsde.workbench.leonardo.monitor

import java.time.Instant
import java.time.temporal.ChronoUnit

import akka.actor.Status.Failure
import akka.actor.{Actor, ActorSystem, Props}
import akka.actor.{Actor, Props}
import akka.pattern.pipe
import cats.data.OptionT
import cats.implicits._
import com.google.api.client.googleapis.json.GoogleJsonResponseException
import com.typesafe.scalalogging.LazyLogging
import io.grpc.Status.Code
import org.broadinstitute.dsde.workbench.google.{GoogleIamDAO, GoogleStorageDAO}
import org.broadinstitute.dsde.workbench.leonardo.config.{DataprocConfig, MonitorConfig}
import org.broadinstitute.dsde.workbench.leonardo.config.{DataprocConfig, MonitorConfig, ClusterBucketConfig}
import org.broadinstitute.dsde.workbench.leonardo.dao.JupyterDAO
import org.broadinstitute.dsde.workbench.leonardo.dao.google.{GoogleComputeDAO, GoogleDataprocDAO}
import org.broadinstitute.dsde.workbench.leonardo.db.DbReference
Expand All @@ -20,6 +21,7 @@ import org.broadinstitute.dsde.workbench.leonardo.model.google.ClusterStatus._
import org.broadinstitute.dsde.workbench.leonardo.model.google.{ClusterStatus, IP, _}
import org.broadinstitute.dsde.workbench.leonardo.monitor.ClusterMonitorActor._
import org.broadinstitute.dsde.workbench.leonardo.monitor.ClusterMonitorSupervisor.{ClusterDeleted, ClusterSupervisorMessage, RemoveFromList}
import org.broadinstitute.dsde.workbench.model.google.GcsLifecycleTypes
import org.broadinstitute.dsde.workbench.util.{Retry, addJitter}
import slick.dbio.DBIOAction

Expand All @@ -31,8 +33,8 @@ object ClusterMonitorActor {
/**
* Creates a Props object used for creating a {{{ClusterMonitorActor}}}.
*/
def props(cluster: Cluster, monitorConfig: MonitorConfig, dataprocConfig: DataprocConfig, gdDAO: GoogleDataprocDAO, googleComputeDAO: GoogleComputeDAO, googleIamDAO: GoogleIamDAO, googleStorageDAO: GoogleStorageDAO, dbRef: DbReference, authProvider: LeoAuthProvider, jupyterProxyDAO: JupyterDAO): Props =
Props(new ClusterMonitorActor(cluster, monitorConfig, dataprocConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, dbRef, authProvider, jupyterProxyDAO))
def props(cluster: Cluster, monitorConfig: MonitorConfig, dataprocConfig: DataprocConfig, clusterBucketConfig: ClusterBucketConfig, gdDAO: GoogleDataprocDAO, googleComputeDAO: GoogleComputeDAO, googleIamDAO: GoogleIamDAO, googleStorageDAO: GoogleStorageDAO, dbRef: DbReference, authProvider: LeoAuthProvider, jupyterProxyDAO: JupyterDAO): Props =
Props(new ClusterMonitorActor(cluster, monitorConfig, dataprocConfig, clusterBucketConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, dbRef, authProvider, jupyterProxyDAO))

// ClusterMonitorActor messages:

Expand All @@ -58,6 +60,7 @@ object ClusterMonitorActor {
class ClusterMonitorActor(val cluster: Cluster,
val monitorConfig: MonitorConfig,
val dataprocConfig: DataprocConfig,
val clusterBucketConfig: ClusterBucketConfig,
val gdDAO: GoogleDataprocDAO,
val googleComputeDAO: GoogleComputeDAO,
val googleIamDAO: GoogleIamDAO,
Expand Down Expand Up @@ -230,6 +233,9 @@ class ClusterMonitorActor(val cluster: Cluster,
// delete the init bucket so we don't continue to accrue costs after cluster is deleted
_ <- deleteInitBucket

// set the staging bucket to be deleted in ten days so that logs are still accessible until then
_ <- setStagingBucketLifecycle

// delete instances in the DB
_ <- persistInstances(Set.empty)

Expand Down Expand Up @@ -395,14 +401,28 @@ class ClusterMonitorActor(val cluster: Cluster,
dbRef.inTransaction { dataAccess =>
dataAccess.clusterQuery.getInitBucket(cluster.googleProject, cluster.clusterName)
} flatMap {
case None => Future.successful( logger.warn(s"Could not lookup bucket for cluster ${cluster.projectNameString}: cluster not in db") )
case None => Future.successful( logger.warn(s"Could not lookup init bucket for cluster ${cluster.projectNameString}: cluster not in db") )
case Some(bucketPath) =>
googleStorageDAO.deleteBucket(bucketPath.bucketName, recurse = true) map { _ =>
logger.debug(s"Deleted init bucket $bucketPath for cluster ${cluster.googleProject}/${cluster.clusterName}")
}
}
}

private def setStagingBucketLifecycle: Future[Unit] = {
// Get the staging bucket path for this cluster, then set the age for it to be deleted the specified number of days after the deletion of the cluster.
dbRef.inTransaction { dataAccess =>
dataAccess.clusterQuery.getStagingBucket(cluster.googleProject, cluster.clusterName)
} flatMap {
case None => Future.successful( logger.warn(s"Could not lookup staging bucket for cluster ${cluster.projectNameString}: cluster not in db") )
case Some(bucketPath) =>
val ageToDelete = cluster.auditInfo.createdDate.until(Instant.now(), ChronoUnit.DAYS).toInt + clusterBucketConfig.stagingBucketExpiration.toDays.toInt
googleStorageDAO.setBucketLifecycle(bucketPath.bucketName, ageToDelete, GcsLifecycleTypes.Delete) map { _ =>
logger.debug(s"Set staging bucket $bucketPath for cluster ${cluster.googleProject}/${cluster.clusterName} to be deleted in ${ageToDelete} days.")
}
}
}

private def removeCredentialsFromMetadata: Future[Unit] = {
cluster.serviceAccountInfo.notebookServiceAccount match {
// No notebook service account: don't remove creds from metadata! We need them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, ActorRef, OneForOneStrategy, Props, Timers}
import com.typesafe.scalalogging.LazyLogging
import org.broadinstitute.dsde.workbench.google.{GoogleIamDAO, GoogleStorageDAO}
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, DataprocConfig, MonitorConfig}
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, ClusterBucketConfig, DataprocConfig, MonitorConfig}
import org.broadinstitute.dsde.workbench.leonardo.dao.JupyterDAO
import org.broadinstitute.dsde.workbench.leonardo.dao.google.{GoogleComputeDAO, GoogleDataprocDAO}
import org.broadinstitute.dsde.workbench.leonardo.db.DbReference
Expand All @@ -19,8 +19,8 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object ClusterMonitorSupervisor {
def props(monitorConfig: MonitorConfig, dataprocConfig: DataprocConfig, gdDAO: GoogleDataprocDAO, googleComputeDAO: GoogleComputeDAO, googleIamDAO: GoogleIamDAO, googleStorageDAO: GoogleStorageDAO, dbRef: DbReference, authProvider: LeoAuthProvider, autoFreezeConfig: AutoFreezeConfig, jupyterProxyDAO: JupyterDAO, leonardoService: LeonardoService): Props =
Props(new ClusterMonitorSupervisor(monitorConfig, dataprocConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, dbRef, authProvider, autoFreezeConfig, jupyterProxyDAO, leonardoService))
def props(monitorConfig: MonitorConfig, dataprocConfig: DataprocConfig, clusterBucketConfig: ClusterBucketConfig, gdDAO: GoogleDataprocDAO, googleComputeDAO: GoogleComputeDAO, googleIamDAO: GoogleIamDAO, googleStorageDAO: GoogleStorageDAO, dbRef: DbReference, authProvider: LeoAuthProvider, autoFreezeConfig: AutoFreezeConfig, jupyterProxyDAO: JupyterDAO, leonardoService: LeonardoService): Props =
Props(new ClusterMonitorSupervisor(monitorConfig, dataprocConfig, clusterBucketConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, dbRef, authProvider, autoFreezeConfig, jupyterProxyDAO, leonardoService))

sealed trait ClusterSupervisorMessage

Expand Down Expand Up @@ -49,7 +49,7 @@ object ClusterMonitorSupervisor {
private case object CheckForClusters extends ClusterSupervisorMessage
}

class ClusterMonitorSupervisor(monitorConfig: MonitorConfig, dataprocConfig: DataprocConfig, gdDAO: GoogleDataprocDAO, googleComputeDAO: GoogleComputeDAO, googleIamDAO: GoogleIamDAO, googleStorageDAO: GoogleStorageDAO, dbRef: DbReference, authProvider: LeoAuthProvider, autoFreezeConfig: AutoFreezeConfig, jupyterProxyDAO: JupyterDAO, leonardoService: LeonardoService)
class ClusterMonitorSupervisor(monitorConfig: MonitorConfig, dataprocConfig: DataprocConfig, clusterBucketConfig: ClusterBucketConfig, gdDAO: GoogleDataprocDAO, googleComputeDAO: GoogleComputeDAO, googleIamDAO: GoogleIamDAO, googleStorageDAO: GoogleStorageDAO, dbRef: DbReference, authProvider: LeoAuthProvider, autoFreezeConfig: AutoFreezeConfig, jupyterProxyDAO: JupyterDAO, leonardoService: LeonardoService)
extends Actor with Timers with LazyLogging {
import context.dispatcher

Expand Down Expand Up @@ -149,7 +149,7 @@ class ClusterMonitorSupervisor(monitorConfig: MonitorConfig, dataprocConfig: Dat
}

def createChildActor(cluster: Cluster): ActorRef = {
context.actorOf(ClusterMonitorActor.props(cluster, monitorConfig, dataprocConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, dbRef, authProvider, jupyterProxyDAO))
context.actorOf(ClusterMonitorActor.props(cluster, monitorConfig, dataprocConfig, clusterBucketConfig, gdDAO, googleComputeDAO, googleIamDAO, googleStorageDAO, dbRef, authProvider, jupyterProxyDAO))
}

def startClusterMonitorActor(cluster: Cluster, watchMessageOpt: Option[ClusterSupervisorMessage] = None): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import net.ceedubs.ficus.Ficus._
import org.broadinstitute.dsde.workbench.google.mock.MockGoogleDataprocDAO
import org.broadinstitute.dsde.workbench.leonardo.auth.WhitelistAuthProvider
import org.broadinstitute.dsde.workbench.leonardo.auth.sam.MockPetClusterServiceAccountProvider
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, ClusterDefaultsConfig, ClusterDnsCacheConfig, ClusterFilesConfig, ClusterResourcesConfig, DataprocConfig, MonitorConfig, ProxyConfig, SwaggerConfig, ZombieClusterConfig}
import org.broadinstitute.dsde.workbench.leonardo.config.{AutoFreezeConfig, ClusterBucketConfig, ClusterDefaultsConfig, ClusterDnsCacheConfig, ClusterFilesConfig, ClusterResourcesConfig, DataprocConfig, MonitorConfig, ProxyConfig, SwaggerConfig, ZombieClusterConfig}
import org.broadinstitute.dsde.workbench.leonardo.dao.google.MockGoogleComputeDAO
import org.broadinstitute.dsde.workbench.leonardo.dao.{MockJupyterDAO, MockSamDAO}
import org.broadinstitute.dsde.workbench.leonardo.db.TestComponent
Expand Down Expand Up @@ -60,6 +60,7 @@ trait CommonTestData{ this: ScalaFutures =>
val clusterUrlBase = dataprocConfig.clusterUrlBase
val serviceAccountsConfig = config.getConfig("serviceAccounts.config")
val monitorConfig = config.as[MonitorConfig]("monitor")
val clusterBucketConfig = config.as[ClusterBucketConfig]("clusterBucket")
val contentSecurityPolicy = config.as[Option[String]]("jupyterConfig.contentSecurityPolicy").getOrElse("default-src: 'self'")
val mockJupyterDAO = new MockJupyterDAO
val singleNodeDefaultMachineConfig = MachineConfig(Some(clusterDefaultsConfig.numberOfWorkers), Some(clusterDefaultsConfig.masterMachineType), Some(clusterDefaultsConfig.masterDiskSize))
Expand All @@ -85,7 +86,7 @@ trait CommonTestData{ this: ScalaFutures =>
val rstudioImage = ClusterImage(RStudio, "rocker/tidyverse:latest", Instant.now)

def makeDataprocInfo(index: Int): DataprocInfo = {
DataprocInfo(Option(UUID.randomUUID()), Option(OperationName("operationName" + index.toString)), Option(GcsBucketName("stagingBucketName" + index.toString)), Some(IP("numbers.and.dots")))
DataprocInfo(Option(UUID.randomUUID()), Option(OperationName("operationName" + index.toString)), Option(GcsBucketName("stagingbucketname" + index.toString)), Some(IP("numbers.and.dots")))
}

def makeCluster(index: Int): Cluster = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class LeonardoModelSpec extends TestComponent with FlatSpecLike with Matchers wi
| "createdDate": "2018-08-07T10:12:35Z",
| "labels": {},
| "jupyterExtensionUri": "gs://extension_bucket/extension_path",
| "stagingBucket": "stagingBucketName1",
| "stagingBucket": "stagingbucketname1",
| "errors": [],
| "instances": [],
| "dateAccessed": "2018-08-07T10:12:35Z",
Expand Down
Loading

0 comments on commit 9b36870

Please sign in to comment.