在Spray中完成请求处理后是否可以安装回调?

编程入门 行业动态 更新时间:2024-10-23 19:24:01
本文介绍了在Spray中完成请求处理后是否可以安装回调?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在尝试从 Spray 提供大型临时文件.一旦 HTTP 请求完成,我需要删除这些文件.到目前为止我找不到办法做到这一点......

I'm trying to serve large temporary files from Spray. I need to delete those files once HTTP request is complete. I could not find a way to do this so far...

我使用的代码类似于 this 或 this:

I'm using code similar to this or this:

respondWithMediaType(`text/csv`) { path("somepath" / CsvObjectIdSegment) { id => CsvExporter.export(id) { // loan pattern to provide temp file for this request file => encodeResponse(Gzip) { getFromFile(file) } } } }

所以本质上它调用 getFromFile 来完成 Future 中的路由.问题是,即使 Future 完成,Web 请求还没有完成.我试图写一个类似于 getFromFile 的函数,我会在 Future 的 onComplete 中调用 file.delete()> 但它有同样的问题 - 如果文件足够大,Future 在客户端完成下载文件之前完成.

So essentially it calls getFromFile which completes the route in a Future. The problem is that even if that Future is complete the web request is not complete yet. I tried to write a function similar to getFromFile and I would call file.delete() in onComplete of that Future but it has the same problem - Future completes before the client finished downloading the file if the file is large enough.

这里是来自 Spray 的 getFromFile 供参考:

Here is getFromFile from Spray for reference:

/** * Completes GET requests with the content of the given file. The actual I/O operation is * running detached in a `Future`, so it doesn't block the current thread (but potentially * some other thread !). If the file cannot be found or read the request is rejected. */ def getFromFile(file: File)(implicit settings: RoutingSettings, resolver: ContentTypeResolver, refFactory: ActorRefFactory): Route = getFromFile(file, resolver(file.getName))

我不能使用 file.deleteOnExit() 因为 JVM 可能暂时不会重新启动,并且临时文件将被放置在浪费磁盘空间的地方.

I can't use file.deleteOnExit() because JVM might not be restarted for a while and temp files will be kept laying around wasting disk space.

另一方面,这是一个更普遍的问题 - 有没有办法在 Spray 中安装回调,以便在请求处理完成时可以释放资源或更新统计信息/日志等.

On the other hand it's a more general question - is there a way to install a callback in Spray so that when request processing is complete resources can be released or statistics/logs can be updated, etc.

推荐答案

感谢 @VladimirPetrosyan 提供指针.这是我的实现方式:

Thanks to @VladimirPetrosyan for the pointer. Here is how I implemented it:

路线有这个:

trait MyService extends HttpService ... with CustomMarshallers { override def routeSettings = implicitly[RoutingSettings] ... get { respondWithMediaType(`text/csv`) { path("somepath" / CsvObjectIdSegment) { filterInstanceId => // just an ObjectId val tempResultsFile = CsvExporter.saveCsvResultsToTempFile(filterInstanceId) respondWithLastModifiedHeader(tempResultsFile.lastModified) { encodeResponse(Gzip) { complete(tempResultsFile) } } } }

和我混合的特征进行解组产生分块响应:

and the trait that I mix in that does the unmarshalling producing chunked response:

import akka.actor._ import spray.httpx.marshalling.{MarshallingContext, Marshaller} import spray.http.{MessageChunk, ChunkedMessageEnd, HttpEntity, ContentType} import spray.can.Http import spray.http.MediaTypes._ import scala.Some import java.io.{RandomAccessFile, File} import spray.routing.directives.FileAndResourceDirectives import spray.routing.RoutingSettings import math._ trait CustomMarshallers extends FileAndResourceDirectives { implicit def actorRefFactory: ActorRefFactory implicit def routeSettings: RoutingSettings implicit val CsvMarshaller = Marshaller.of[File](`text/csv`) { (file: File, contentType: ContentType, ctx: MarshallingContext) => actorRefFactory.actorOf { Props { new Actor with ActorLogging { val defaultChunkSize = min(routeSettings.fileChunkingChunkSize, routeSettings.fileChunkingThresholdSize).toInt private def getNumberOfChunks(file: File): Int = { val randomAccessFile = new RandomAccessFile(file, "r") try { ceil(randomAccessFile.length.toDouble / defaultChunkSize).toInt } finally { randomAccessFile.close } } private def readChunk(file: File, chunkIndex: Int): String = { val randomAccessFile = new RandomAccessFile(file, "r") val byteBuffer = new Array[Byte](defaultChunkSize) try { val seek = chunkIndex * defaultChunkSize randomAccessFile.seek(seek) val nread = randomAccessFile.read(byteBuffer) if(nread == -1) "" else if(nread < byteBuffer.size) new String(byteBuffer.take(nread)) else new String(byteBuffer) } finally { randomAccessFile.close } } val chunkNum = getNumberOfChunks(file) val responder: ActorRef = ctx.startChunkedMessage(HttpEntity(contentType, ""), Some(Ok(0)))(self) sealed case class Ok(seq: Int) def stop() = { log.debug("Stopped CSV download handler actor.") responder ! ChunkedMessageEnd file.delete() context.stop(self) } def sendCSV(seq: Int) = if (seq < chunkNum) responder ! MessageChunk(readChunk(file, seq)).withAck(Ok(seq + 1)) else stop() def receive = { case Ok(seq) => sendCSV(seq) case ev: Http.ConnectionClosed => log.debug("Stopping response streaming due to {}", ev) } } } } } }

创建临时文件,然后actor 开始流式传输块.每当收到来自客户端的响应时,它就会发送一个块.每当客户端断开连接时,临时文件都会被删除,actor 会被关闭.

The temp file is created and then actor starts streaming chunks. It sends a chunk whenever response from client is received. Whenever client disconnects temp file is deleted and actor is shut down.

这要求您在 spray-can 中运行您的应用,如果您在容器中运行它,我认为将无法运行.

This requires you to run your app in spray-can and I think will not work if you run it in container.

一些有用的链接:example1, example2, 文档

Some useful links: example1, example2, docs

更多推荐

在Spray中完成请求处理后是否可以安装回调?

本文发布于:2023-11-25 04:04:04,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1628246.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:回调   Spray

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!