Skip to content

Commit

Permalink
Synchronous jupyter_localize_extension (#309)
Browse files Browse the repository at this point in the history
* jupyter_localize_extension.py: add synchronous mode and better input sanitization

* tornado better

* Update swagger and beef up localization automation tests

* Improve loc/deloc tests; they should pass now

* PR feedback, and made sync mode the default

* variable name change
  • Loading branch information
rtitle authored Apr 18, 2018
1 parent aef113c commit 2a652d0
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ object Leonardo extends RestClient with LazyLogging {
def contentsPath(googleProject: GoogleProject, clusterName: ClusterName, contentPath: String): String =
s"${notebooksPath(googleProject, clusterName)}/api/contents/$contentPath"

def localizePath(googleProject: GoogleProject, clusterName: ClusterName): String =
s"${notebooksPath(googleProject, clusterName)}/api/localize"
def localizePath(googleProject: GoogleProject, clusterName: ClusterName, async: Boolean = false): String =
s"${notebooksPath(googleProject, clusterName)}/api/localize${if (async) "?async=true" else ""}"

def get(googleProject: GoogleProject, clusterName: ClusterName)(implicit token: AuthToken, webDriver: WebDriver): NotebooksListPage = {
val path = notebooksPath(googleProject, clusterName)
Expand All @@ -165,8 +165,8 @@ object Leonardo extends RestClient with LazyLogging {
parseResponse(getRequest(url + path))
}

def localize(googleProject: GoogleProject, clusterName: ClusterName, locMap: Map[String, String])(implicit token: AuthToken): String = {
val path = localizePath(googleProject, clusterName)
def localize(googleProject: GoogleProject, clusterName: ClusterName, locMap: Map[String, String], async: Boolean = false)(implicit token: AuthToken): String = {
val path = localizePath(googleProject, clusterName, async)
logger.info(s"Localize notebook files: POST /$path")
val cookie = Cookie(HttpCookiePair("LeoToken", token.value))
postRequest(url + path, locMap, httpHeaders = List(cookie))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.broadinstitute.dsde.workbench.service.test.WebBrowserSpec
import org.broadinstitute.dsde.workbench.leonardo.ClusterStatus.ClusterStatus
import org.broadinstitute.dsde.workbench.leonardo.StringValueClass.LabelMap
import org.broadinstitute.dsde.workbench.model.WorkbenchEmail
import org.broadinstitute.dsde.workbench.model.google.{GcsBucketName, GcsObjectName, GcsPath, GoogleProject, ServiceAccountName, generateUniqueBucketName}
import org.broadinstitute.dsde.workbench.model.google._
import org.broadinstitute.dsde.workbench.util.LocalFileUtil
import org.openqa.selenium.WebDriver
import org.scalactic.source.Position
Expand Down Expand Up @@ -381,6 +381,66 @@ trait LeonardoTestUtils extends WebBrowserSpec with Matchers with Eventually wit
testResult.get
}

def withLocalizeDelocalizeFiles[T](cluster: Cluster, fileToLocalize: String, fileToLocalizeContents: String, fileToDelocalize: String, fileToDelocalizeContents: String)
(testCode: (Map[String, String], GcsBucketName) => T)
(implicit webDriver: WebDriver, token: AuthToken): T = {
implicit val patienceConfig: PatienceConfig = storagePatience

withNewGoogleBucket(cluster.googleProject) { bucketName =>
// give the user's pet owner access to the bucket
val petServiceAccount = Sam.user.petServiceAccountEmail(cluster.googleProject.value)
googleStorageDAO.setBucketAccessControl(bucketName, GcsEntity(petServiceAccount, GcsEntityTypes.User), GcsRoles.Owner).futureValue

// create a bucket object to localize
val bucketObjectToLocalize = GcsObjectName(fileToLocalize)
withNewBucketObject(bucketName, bucketObjectToLocalize, fileToLocalizeContents, "text/plain") { objectName =>
// give the user's pet read access to the object
googleStorageDAO.setObjectAccessControl(bucketName, objectName, GcsEntity(petServiceAccount, GcsEntityTypes.User), GcsRoles.Owner).futureValue

// create a notebook file to delocalize
withNewNotebook(cluster) { notebookPage =>
notebookPage.executeCell(s"""! echo -n "$fileToDelocalizeContents" > $fileToDelocalize""")

val localizeRequest = Map(
fileToLocalize -> GcsPath(bucketName, bucketObjectToLocalize).toUri,
GcsPath(bucketName, GcsObjectName(fileToDelocalize)).toUri -> fileToDelocalize
)

val testResult = Try(testCode(localizeRequest, bucketName))

// clean up files on the cluster
// no need to clean up the bucket objects; that will happen as part of `withNewBucketObject`
notebookPage.executeCell(s"""! rm -f $fileToLocalize""")
notebookPage.executeCell(s"""! rm -f $fileToDelocalize""")

testResult.get
}
}
}
}

def verifyLocalizeDelocalize(cluster: Cluster, localizedFileName: String, localizedFileContents: String, delocalizedBucketPath: GcsPath, delocalizedBucketContents: String)(implicit token: AuthToken): Unit = {
implicit val patienceConfig: PatienceConfig = storagePatience

// check localization.log for existence
val localizationLog = Leonardo.notebooks.getContentItem(cluster.googleProject, cluster.clusterName, "localization.log", includeContent = true)
localizationLog.content shouldBe defined

// Save localization.log to test output to aid in debugging
val downloadFile = new File(logDir, s"${cluster.googleProject.value}-${cluster.clusterName.string}-localization.log")
val fos = new FileOutputStream(downloadFile)
fos.write(localizationLog.content.get.getBytes)
fos.close()
logger.info(s"Saved localization log for cluster ${cluster.googleProject.value}/${cluster.clusterName.string} to ${downloadFile.getAbsolutePath}")

// the localized file should exist on the notebook VM
val item = Leonardo.notebooks.getContentItem(cluster.googleProject, cluster.clusterName, localizedFileName, includeContent = true)
item.content shouldBe Some(localizedFileContents)

// the delocalized file should exist in the Google bucket
val data = googleStorageDAO.getObject(delocalizedBucketPath.bucketName, delocalizedBucketPath.objectName).futureValue
data.map(_.toString) shouldBe Some(delocalizedBucketContents)
}

def verifyHailImport(notebookPage: NotebookPage, vcfPath: GcsPath, clusterName: ClusterName): Unit = {
val hailTimeout = 5 minutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package org.broadinstitute.dsde.workbench.leonardo
import java.io.File
import java.nio.file.Files

import org.broadinstitute.dsde.workbench.service.{Orchestration, Sam}
import org.broadinstitute.dsde.workbench.service.{Orchestration, RestException, Sam}
import org.broadinstitute.dsde.workbench.ResourceFile
import org.broadinstitute.dsde.workbench.auth.AuthToken
import org.broadinstitute.dsde.workbench.dao.Google.googleStorageDAO
import org.broadinstitute.dsde.workbench.fixture.BillingFixtures
import org.broadinstitute.dsde.workbench.model.google.{GcsEntity, GcsEntityTypes, GcsObjectName, GcsRoles, GoogleProject}
import org.broadinstitute.dsde.workbench.model.google.{GcsEntity, GcsEntityTypes, GcsObjectName, GcsPath, GcsRoles, GoogleProject}
import org.scalatest.{BeforeAndAfterAll, FreeSpec}

import scala.concurrent.duration._
Expand Down Expand Up @@ -86,33 +86,62 @@ class NotebookInteractionSpec extends FreeSpec with LeonardoTestUtils with Befor
}
}

"should localize files" in withWebDriver { implicit driver =>
withFileUpload(ronCluster, hailUploadFile) { _ =>
//good data
val goodLocalize = Map(
"test.rtf" -> s"$swatTestBucket/test.rtf"
//TODO: create a bucket and upload to there
//"gs://new_bucket/import-hail.ipynb" -> "import-hail.ipynb"
)
"should localize files in async mode" in withWebDriver { implicit driver =>
val localizeFileName = "localize_async.txt"
val localizeFileContents = "Async localize test"
val delocalizeFileName = "delocalize_async.txt"
val delocalizeFileContents = "Async delocalize test"

withLocalizeDelocalizeFiles(ronCluster, localizeFileName, localizeFileContents, delocalizeFileName, delocalizeFileContents) { (localizeRequest, bucketName) =>
// call localize; this should return 200
Leonardo.notebooks.localize(ronCluster.googleProject, ronCluster.clusterName, localizeRequest, async = true)

// check that the files are eventually at their destinations
implicit val patienceConfig: PatienceConfig = localizePatience
eventually {
Leonardo.notebooks.localize(ronCluster.googleProject, ronCluster.clusterName, goodLocalize)
//the following line will barf with an exception if the file isn't there; that's enough
Leonardo.notebooks.getContentItem(ronCluster.googleProject, ronCluster.clusterName, "test.rtf", includeContent = false)
verifyLocalizeDelocalize(ronCluster, localizeFileName, localizeFileContents, GcsPath(bucketName, GcsObjectName(delocalizeFileName)), delocalizeFileContents)
}

// call localize again with bad data. This should still return 200 since we're in async mode.
val badLocalize = Map("file.out" -> "gs://nobuckethere")
Leonardo.notebooks.localize(ronCluster.googleProject, ronCluster.clusterName, badLocalize, async = true)

// it should not have localized this file
val thrown = the [RestException] thrownBy {
Leonardo.notebooks.getContentItem(ronCluster.googleProject, ronCluster.clusterName, "file.out", includeContent = false)
}
// why doesn't `RestException` have a status code field?
thrown.message should include ("No such file or directory: file.out")
}
}

val localizationLog = Leonardo.notebooks.getContentItem(ronCluster.googleProject, ronCluster.clusterName, "localization.log")
localizationLog.content shouldBe defined
localizationLog.content.get shouldNot include("Exception")
"should localize files in sync mode" in withWebDriver { implicit driver =>
val localizeFileName = "localize_sync.txt"
val localizeFileContents = "Sync localize test"
val delocalizeFileName = "delocalize_sync.txt"
val delocalizeFileContents = "Sync delocalize test"

//bad data
withLocalizeDelocalizeFiles(ronCluster, localizeFileName, localizeFileContents, delocalizeFileName, delocalizeFileContents) { (localizeRequest, bucketName) =>
// call localize; this should return 200
Leonardo.notebooks.localize(ronCluster.googleProject, ronCluster.clusterName, localizeRequest, async = false)

// check that the files are immediately at their destinations
verifyLocalizeDelocalize(ronCluster, localizeFileName, localizeFileContents, GcsPath(bucketName, GcsObjectName(delocalizeFileName)), delocalizeFileContents)

// call localize again with bad data. This should still return 500 since we're in sync mode.
val badLocalize = Map("file.out" -> "gs://nobuckethere")
Leonardo.notebooks.localize(ronCluster.googleProject, ronCluster.clusterName, badLocalize)
val localizationLogAgain = Leonardo.notebooks.getContentItem(ronCluster.googleProject, ronCluster.clusterName, "localization.log")
localizationLogAgain.content shouldBe defined
localizationLogAgain.content.get should include("Exception")
val thrown = the [RestException] thrownBy {
Leonardo.notebooks.localize(ronCluster.googleProject, ronCluster.clusterName, badLocalize, async = false)
}
// why doesn't `RestException` have a status code field?
thrown.message should include ("500 : Internal Server Error")
thrown.message should include ("Error occurred during localization. See localization.log for details.")

// it should not have localized this file
val contentThrown = the [RestException] thrownBy {
Leonardo.notebooks.getContentItem(ronCluster.googleProject, ronCluster.clusterName, "file.out", includeContent = false)
}
contentThrown.message should include ("No such file or directory: file.out")
}
}

Expand Down
35 changes: 27 additions & 8 deletions jupyter-docker/jupyter_localize_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ def sanitize(self, pathstr):
@gen.coroutine
def localize(self, pathdict):
"""Treats the given dict as a string/string map and sends it to gsutil."""
all_success = True
#This gets dropped inside the user's notebook working directory
with open("localization.log", 'a', buffering=1) as locout:
for key in pathdict:
#NOTE: keys are destinations, values are sources
cmd = " ".join(["gsutil -m -q cp -R -c -e", self.sanitize(pathdict[key]), self.sanitize(key)])
locout.write(cmd + '\n')
subprocess.call(cmd, stderr=locout, shell=True)
cmd = ['gsutil', '-m', '-q', 'cp', '-R', '-c', '-e', self.sanitize(pathdict[key]), self.sanitize(key)]
locout.write(' '.join(cmd) + '\n')
code = subprocess.call(cmd, stderr=locout)
if code is not 0:
all_success = False
return all_success

def post(self):
try:
Expand All @@ -43,12 +47,27 @@ def post(self):
if not all(map(lambda v: type(v) is unicode, pathdict.values())):
raise HTTPError(400, "Body must be JSON object of type string/string")

#complete the request HERE, without waiting for the localize to run
self.set_status(200)
self.finish()
async = self.get_query_argument('async', False)

#fire and forget the actual work -- it'll log to a file in the user's homedir
tornado.ioloop.IOLoop.current().spawn_callback(self.localize, pathdict)
if async:
#complete the request HERE, without waiting for the localize to run
self.set_status(200)
self.finish()

#fire and forget the actual work -- it'll log to a file in the user's homedir
tornado.ioloop.IOLoop.current().spawn_callback(self.localize, pathdict)

else:
#run localize synchronous to the HTTP request
#run_sync() doesn't take arguments, so we must wrap the call in a lambda.
success = tornado.ioloop.IOLoop().run_sync(lambda: self.localize(pathdict))

#complete the request only after localize completes
if not success:
raise HTTPError(500, "Error occurred during localization. See localization.log for details.")
else:
self.set_status(200)
self.finish()

def load_jupyter_server_extension(nb_server_app):
"""Entrypoint for the Jupyter extension."""
Expand Down
13 changes: 12 additions & 1 deletion src/main/resources/swagger/api-docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ paths:
summary: Localize files to/from a Jupyter notebook server
description: |
Sends a command to a Jupyter notebook server to localize files between the server and GCS.
This operation will happen asynchronously; output, including any errors, will appear in `localization.log` in the working directory of the Jupyter notebook server.
Output, including any errors, will appear in `localization.log` in the working directory of the Jupyter notebook server.
By default this operation will happen synchronously and the response status will reflect any errors encountered in the copy.
However, if the `async` parameter is specfied then the localization will happen asynchronously to the request, and the API will always return 200.
operationId: proxyLocalize
tags:
- notebooks
Expand All @@ -350,6 +353,14 @@ paths:
description: clusterName
required: true
type: string
- in: query
name: async
description: |
If true, the copy will happen asynchronously to the request and the API will always return 200.
If false (the default), the copy will happen synchronously and the response will reflect any errors encountered during the copy.
required: false
type: boolean
default: false
- in: body
description: |
JSON object. Keys represent destinations, values represent sources.
Expand Down

0 comments on commit 2a652d0

Please sign in to comment.