分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 运营维护

Akka(37): Http:客户端操作模式

发布时间:2023-09-06 01:26责任编辑:沈小雨关键词:暂无标签

   Akka-http的客户端连接模式除Connection-Level和Host-Level之外还有一种非常便利的模式:Request-Level-Api。这种模式免除了连接Connection的概念,任何时候可以直接调用singleRequest来与服务端沟通。下面我们用几个例子来示范singleRequest的用法:

 ?(for { ???response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message")) ???message <- Unmarshal(response.entity).to[String] ?} yield message).andThen { ???case Success(msg) => println(s"Received message: $msg") ???case Failure(err) => println(s"Error: ${err.getMessage}") ?}.andThen {case _ => sys.terminate()}

这是一个GET操作:用Http().singleRequest直接把HttpRequest发送给服务端uri并获取返回的HttpResponse。我们看到,整组函数的返回类型都是Future[?],所以用for-comprehension来把所有实际运算包嵌在Future运算模式内(context)。下面这个例子是客户端上传数据示范:

 (for { ???entity <- Marshal("Wata hell you doing?").to[RequestEntity] ???response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity)) ???message <- Unmarshal(response.entity).to[String] ?} yield message).andThen { ???case Success(msg) => println(s"Received message: $msg") ???case Failure(err) => println(s"Error: ${err.getMessage}") ?}.andThen {case _ => sys.terminate()}

以上是个PUT操作。我们需要先构建数据载体HttpEntity。格式转换函数Marshal也返回Future[HttpEntity],所以也可以包含在for语句内。关注一下这个andThen,它可以连接一串多个monadic运算,在不影响上游运算结果的情况下实现一些副作用计算。值得注意的是上面这两个例子虽然表现形式很简洁,但我们无法对数据转换过程中的异常及response的状态码等进行监控。所以我们应该把整个过程拆分成两部分:先获取response,再具体处理response,包括核对状态,处理数据等:

 ?case class Item(id: Int, name: String, price: Double) ?def getItem(itemId: Int): Future[HttpResponse] = for { ???response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId")) ?} yield response ?def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = { ???futResp.andThen { ?????case Success(HttpResponse(StatusCodes.OK, _, entity, _)) => ???????Unmarshal(entity).to[T] ?????????.onComplete { ???????????case Success(t) => println(s"Got response entity: ${t}") ???????????case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}") ?????????} ?????case Success(_) => println("Exception in response!") ?????case Failure(err) => println(s"Response Failed: ${err.getMessage}") ???} ?} ?extractEntity[Item](getItem(13))

现在这个extractEntity[Item](getItem(13))可以实现全过程的监控管理了。用同样的模式实现PUT操作:

 ?def putItem(item: Item): Future[HttpResponse] = ??for { ???reqEntity <- Marshal(item).to[RequestEntity] ???response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity)) ??} yield response ?extractEntity[Item](putItem(Item(23,"Item#23", 46.0))) ????.andThen { case _ => sys.terminate()}

当然,我们还是使用了前面几篇讨论里的Marshalling方式来进行数据格式的自动转换:

import de.heikoseeberger.akkahttpjson4s.Json4sSupportimport org.json4s.jackson...trait JsonCodec extends Json4sSupport { ?import org.json4s.DefaultFormats ?import org.json4s.ext.JodaTimeSerializers ?implicit val serilizer = jackson.Serialization ?implicit val formats = DefaultFormats ++ JodaTimeSerializers.all}object JsConverters extends JsonCodec... ?import JsConverters._ ?implicit val jsonStreamingSupport = EntityStreamingSupport.json() ???.withParallelMarshalling(parallelism = 8, unordered = false)

如果我们需要对数据交换过程进行更细致的管控,用Host-Level-Api会更加适合。下面我们就针对Host-Level-Api构建一个客户端的工具库:

class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings) ?????????????????(implicit sys: ActorSystem, mat: ActorMaterializer) { ?import sys.dispatcher ?private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] = ???Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings)//单一request ?def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = { ???Source.single(req -> 1) ?????.via(cnnPool) ?????.runWith(Sink.head).flatMap { ?????case (Success(resp), _) => Future.successful(resp) ?????case (Failure(fail), _) => Future.failed(fail) ???} ?}//组串request ?def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = { ???Source(reqs.zipWithIndex.toMap) ?????.via(cnnPool) ?????.runFold(SortedMap[Int, Future[HttpResponse]]()) { ???????case (m, (Success(r), idx)) => m + (idx -> Future.successful(r)) ???????case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f)) ?????}.flatMap { m => Future.sequence(m.values) } ?}}

下面是一种比较安全的模式:使用了queue来暂存request从而解决因发送方与接收方速率不同所产生的问题:

class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings) ?????????????????????????(qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew) ?????????????????(implicit sys: ActorSystem, mat: ActorMaterializer) { ?import sys.dispatcher ?private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] = ???Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings) ?val queue = ???Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy) ?????.via(cnnPool) ?????.to(Sink.foreach({ ???????case ((Success(resp), p)) => p.success(resp) ???????case ((Failure(e), p)) ???=> p.failure(e) ?????})).run() ?def queueRequest(request: HttpRequest): Future[HttpResponse] = { ?????val responsePromise = Promise[HttpResponse]() ?????queue.offer(request -> responsePromise).flatMap { ???????case QueueOfferResult.Enqueued ???=> responsePromise.future ???????case QueueOfferResult.Dropped ????=> Future.failed(new RuntimeException("Queue overflowed. Try again later.")) ???????case QueueOfferResult.Failure(ex) => Future.failed(ex) ???????case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) ?????} ?}}

下面是这些工具函数的具体使用示范:

 ?val settings = ConnectionPoolSettings(sys) ???.withMaxConnections(8) ???.withMaxOpenRequests(8) ???.withMaxRetries(3) ???.withPipeliningLimit(4) ?val pooledClient = new PooledClient("localhost",8011,settings) ?def getItemByPool(itemId: Int): Future[HttpResponse] = for { ???response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId")) ?} yield response ?extractEntity[Item](getItemByPool(13)) ?def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = { ???val reqs = itemIds.map { id => ?????HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id") ???} ???val rets = (for { ?????responses <- pooledClient.orderedResponses(reqs) ???} yield responses) ???rets ?} ?val futResps = getItemsByPool(List(3,5,7)) ?futResps.andThen { ???case Success(listOfResps) => { ?????listOfResps.foreach { r => ???????r match { ?????????case HttpResponse(StatusCodes.OK, _, entity, _) => ???????????Unmarshal(entity).to[Item] ?????????????.onComplete { ???????????????case Success(t) => println(s"Got response entity: ${t}") ???????????????case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}") ?????????????} ?????????case _ => println("Exception in response!") ???????} ?????} ???} ???case _ => println("Failed to get list of responses!") ?} ?val queuedClient = new QueuedRequestsClient("localhost",8011,settings)() ?def putItemByQueue(item: Item): Future[HttpResponse] = ???for { ?????reqEntity <- Marshal(item).to[RequestEntity] ?????response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity)) ???} yield response ?extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0))) ???.andThen { case _ => sys.terminate()}

下面是本次讨论的示范源代码:

服务端代码:

import akka.actor._import akka.stream._import akka.http.scaladsl.Httpimport akka.http.scaladsl.server.Directives._import de.heikoseeberger.akkahttpjson4s.Json4sSupportimport org.json4s.jacksontrait JsonCodec extends Json4sSupport { ?import org.json4s.DefaultFormats ?import org.json4s.ext.JodaTimeSerializers ?implicit val serilizer = jackson.Serialization ?implicit val formats = DefaultFormats ++ JodaTimeSerializers.all}object JsConverters extends JsonCodecobject TestServer extends App with JsonCodec { ?implicit val httpSys = ActorSystem("httpSystem") ?implicit val httpMat = ActorMaterializer() ?implicit val httpEC = httpSys.dispatcher ?import JsConverters._ ?case class Item(id: Int, name: String, price: Double) ?val messages = path("message") { ???get { ?????complete("hello, how are you?") ???} ~ ???put { ?????entity(as[String]) {msg => ???????complete(msg) ?????} ???} ?} ?val items = ???(path("item" / IntNumber) & get) { id => ??????get { ????????complete(Item(id, s"item#$id", id * 2.0)) ??????} ???} ~ ?????(path("item") & put) { ???????entity(as[Item]) {item => ?????????complete(item) ???????} ????} ?val route = messages ~ items ?val (host, port) = ("localhost", 8011) ?val bindingFuture = Http().bindAndHandle(route,host,port) ?println(s"Server running at $host $port. Press any key to exit ...") ?scala.io.StdIn.readLine() ?bindingFuture.flatMap(_.unbind()) ???.onComplete(_ => httpSys.terminate())}

客户端源代码: 

import akka.actor._import akka.http.scaladsl.settings.ConnectionPoolSettingsimport akka.stream._import akka.stream.scaladsl._import akka.http.scaladsl.Httpimport akka.http.scaladsl.model._import scala.util._import de.heikoseeberger.akkahttpjson4s.Json4sSupportimport org.json4s.jacksonimport scala.concurrent._import akka.http.scaladsl.unmarshalling.Unmarshalimport akka.http.scaladsl.unmarshalling._import akka.http.scaladsl.marshalling.Marshalimport scala.collection.SortedMapimport akka.http.scaladsl.common._trait JsonCodec extends Json4sSupport { ?import org.json4s.DefaultFormats ?import org.json4s.ext.JodaTimeSerializers ?implicit val serilizer = jackson.Serialization ?implicit val formats = DefaultFormats ++ JodaTimeSerializers.all}object JsConverters extends JsonCodecclass PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings) ?????????????????(implicit sys: ActorSystem, mat: ActorMaterializer) { ?import sys.dispatcher ?private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] = ???Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings) ?def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = { ???Source.single(req -> 1) ?????.via(cnnPool) ?????.runWith(Sink.head).flatMap { ?????case (Success(resp), _) => Future.successful(resp) ?????case (Failure(fail), _) => Future.failed(fail) ???} ?} ?def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = { ???Source(reqs.zipWithIndex.toMap) ?????.via(cnnPool) ?????.runFold(SortedMap[Int, Future[HttpResponse]]()) { ???????case (m, (Success(r), idx)) => m + (idx -> Future.successful(r)) ???????case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f)) ?????}.flatMap { m => Future.sequence(m.values) } ?}}class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings) ?????????????????????????(qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew) ?????????????????(implicit sys: ActorSystem, mat: ActorMaterializer) { ?import sys.dispatcher ?private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] = ???Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings) ?val queue = ???Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy) ?????.via(cnnPool) ?????.to(Sink.foreach({ ???????case ((Success(resp), p)) => p.success(resp) ???????case ((Failure(e), p)) ???=> p.failure(e) ?????})).run() ?def queueRequest(request: HttpRequest): Future[HttpResponse] = { ?????val responsePromise = Promise[HttpResponse]() ?????queue.offer(request -> responsePromise).flatMap { ???????case QueueOfferResult.Enqueued ???=> responsePromise.future ???????case QueueOfferResult.Dropped ????=> Future.failed(new RuntimeException("Queue overflowed. Try again later.")) ???????case QueueOfferResult.Failure(ex) => Future.failed(ex) ???????case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) ?????} ?}}object ClientRequesting extends App { ?import JsConverters._ ?implicit val sys = ActorSystem("sysClient") ?implicit val mat = ActorMaterializer() ?implicit val ec = sys.dispatcher ?implicit val jsonStreamingSupport = EntityStreamingSupport.json() ???.withParallelMarshalling(parallelism = 8, unordered = false) ?case class Item(id: Int, name: String, price: Double) ?def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = { ???futResp.andThen { ?????case Success(HttpResponse(StatusCodes.OK, _, entity, _)) => ???????Unmarshal(entity).to[T] ?????????.onComplete { ???????????case Success(t) => println(s"Got response entity: ${t}") ???????????case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}") ?????????} ?????case Success(_) => println("Exception in response!") ?????case Failure(err) => println(s"Response Failed: ${err.getMessage}") ???} ?} ???(for { ???response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message")) ???message <- Unmarshal(response.entity).to[String] ?} yield message).andThen { ???case Success(msg) => println(s"Received message: $msg") ???case Failure(err) => println(s"Error: ${err.getMessage}") ?} ?//.andThen {case _ => sys.terminate()} ?(for { ???entity <- Marshal("Wata hell you doing?").to[RequestEntity] ???response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity)) ???message <- Unmarshal(response.entity).to[String] ?} yield message).andThen { ???case Success(msg) => println(s"Received message: $msg") ???case Failure(err) => println(s"Error: ${err.getMessage}") ?} //.andThen {case _ => sys.terminate()} ?def getItem(itemId: Int): Future[HttpResponse] = for { ???response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId")) ?} yield response ?extractEntity[Item](getItem(13)) ?def putItem(item: Item): Future[HttpResponse] = ??for { ???reqEntity <- Marshal(item).to[RequestEntity] ???response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity)) ??} yield response ?extractEntity[Item](putItem(Item(23,"Item#23", 46.0))) ????.andThen { case _ => sys.terminate()} ???val settings = ConnectionPoolSettings(sys) ???.withMaxConnections(8) ???.withMaxOpenRequests(8) ???.withMaxRetries(3) ???.withPipeliningLimit(4) ?val pooledClient = new PooledClient("localhost",8011,settings) ?def getItemByPool(itemId: Int): Future[HttpResponse] = for { ???response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId")) ?} yield response ?extractEntity[Item](getItemByPool(13)) ?def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = { ???val reqs = itemIds.map { id => ?????HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id") ???} ???val rets = (for { ?????responses <- pooledClient.orderedResponses(reqs) ???} yield responses) ???rets ?} ?val futResps = getItemsByPool(List(3,5,7)) ?futResps.andThen { ???case Success(listOfResps) => { ?????listOfResps.foreach { r => ???????r match { ?????????case HttpResponse(StatusCodes.OK, _, entity, _) => ???????????Unmarshal(entity).to[Item] ?????????????.onComplete { ???????????????case Success(t) => println(s"Got response entity: ${t}") ???????????????case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}") ?????????????} ?????????case _ => println("Exception in response!") ???????} ?????} ???} ???case _ => println("Failed to get list of responses!") ?} ?val queuedClient = new QueuedRequestsClient("localhost",8011,settings)() ???def putItemByQueue(item: Item): Future[HttpResponse] = ???for { ?????reqEntity <- Marshal(item).to[RequestEntity] ?????response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity)) ???} yield response ?extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0))) ???.andThen { case _ => sys.terminate()} ?}

Akka(37): Http:客户端操作模式

原文地址:http://www.cnblogs.com/tiger-xc/p/7878579.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved