@@ -13,14 +13,15 @@ import cats.effect.std.Supervisor
1313import cats .syntax .all ._
1414import cats .effect .syntax .all ._
1515import jsonrpclib .internals .MessageDispatcher
16- import jsonrpclib .internals ._
1716
1817import scala .util .Try
18+ import java .util .regex .Pattern
1919
2020trait FS2Channel [F [_]] extends Channel [F ] {
2121
22- def input : Pipe [F , Payload , Unit ]
23- def output : Stream [F , Payload ]
22+ def input : Pipe [F , Message , Unit ]
23+ def inputOrBounce : Pipe [F , Either [ProtocolError , Message ], Unit ]
24+ def output : Stream [F , Message ]
2425
2526 def withEndpoint (endpoint : Endpoint [F ])(implicit F : Functor [F ]): Resource [F , FS2Channel [F ]] =
2627 Resource .make(mountEndpoint(endpoint))(_ => unmountEndpoint(endpoint.method)).map(_ => this )
@@ -52,8 +53,8 @@ object FS2Channel {
5253 ): Stream [F , FS2Channel [F ]] = {
5354 for {
5455 supervisor <- Stream .resource(Supervisor [F ])
55- ref <- Ref [F ].of(State [F ](Map .empty, Map .empty, Map .empty, 0 )).toStream
56- queue <- cats.effect.std.Queue .bounded[F , Payload ](bufferSize).toStream
56+ ref <- Ref [F ].of(State [F ](Map .empty, Map .empty, Map .empty, Vector .empty, 0 )).toStream
57+ queue <- cats.effect.std.Queue .bounded[F , Message ](bufferSize).toStream
5758 impl = new Impl (queue, ref, supervisor, cancelTemplate)
5859
5960 // Creating a bespoke endpoint to receive cancelation requests
@@ -73,6 +74,7 @@ object FS2Channel {
7374 runningCalls : Map [CallId , Fiber [F , Throwable , Unit ]],
7475 pendingCalls : Map [CallId , OutputMessage => F [Unit ]],
7576 endpoints : Map [String , Endpoint [F ]],
77+ globEndpoints : Vector [(Pattern , Endpoint [F ])],
7678 counter : Long
7779 ) {
7880 def nextCallId : (State [F ], CallId ) = (this .copy(counter = counter + 1 ), CallId .NumberId (counter))
@@ -82,11 +84,27 @@ object FS2Channel {
8284 val result = pendingCalls.get(callId)
8385 (this .copy(pendingCalls = pendingCalls.removed(callId)), result)
8486 }
85- def mountEndpoint (endpoint : Endpoint [F ]): Either [ConflictingMethodError , State [F ]] =
86- endpoints.get(endpoint.method) match {
87- case None => Right (this .copy(endpoints = endpoints + (endpoint.method -> endpoint)))
88- case Some (_) => Left (ConflictingMethodError (endpoint.method))
87+ def mountEndpoint (endpoint : Endpoint [F ]): Either [ConflictingMethodError , State [F ]] = {
88+ import endpoint .method
89+ if (method.contains(" *" )) {
90+ val parts = method
91+ .split(" \\ *" , - 1 )
92+ .map { // Don't discard trailing empty string, if any.
93+ case " " => " "
94+ case str => Pattern .quote(str)
95+ }
96+ val glob = Pattern .compile(parts.mkString(" .*" ))
97+ Right (this .copy(globEndpoints = globEndpoints :+ (glob -> endpoint)))
98+ } else {
99+ endpoints.get(endpoint.method) match {
100+ case None => Right (this .copy(endpoints = endpoints + (endpoint.method -> endpoint)))
101+ case Some (_) => Left (ConflictingMethodError (endpoint.method))
102+ }
89103 }
104+ }
105+ def getEndpoint (method : String ): Option [Endpoint [F ]] = {
106+ endpoints.get(method).orElse(globEndpoints.find(_._1.matcher(method).matches()).map(_._2))
107+ }
90108 def removeEndpoint (method : String ): State [F ] =
91109 copy(endpoints = endpoints.removed(method))
92110
@@ -98,16 +116,20 @@ object FS2Channel {
98116 }
99117
100118 private class Impl [F [_]](
101- private val queue : cats.effect.std.Queue [F , Payload ],
119+ private val queue : cats.effect.std.Queue [F , Message ],
102120 private val state : Ref [F , FS2Channel .State [F ]],
103121 supervisor : Supervisor [F ],
104122 maybeCancelTemplate : Option [CancelTemplate ]
105123 )(implicit F : Concurrent [F ])
106124 extends MessageDispatcher [F ]
107125 with FS2Channel [F ] {
108126
109- def output : Stream [F , Payload ] = Stream .fromQueueUnterminated(queue)
110- def input : Pipe [F , Payload , Unit ] = _.evalMap(handleReceivedPayload)
127+ def output : Stream [F , Message ] = Stream .fromQueueUnterminated(queue)
128+ def inputOrBounce : Pipe [F , Either [ProtocolError , Message ], Unit ] = _.evalMap {
129+ case Left (error) => sendProtocolError(error)
130+ case Right (message) => handleReceivedMessage(message)
131+ }
132+ def input : Pipe [F , Message , Unit ] = _.evalMap(handleReceivedMessage)
111133
112134 def mountEndpoint (endpoint : Endpoint [F ]): F [Unit ] = state
113135 .modify(s =>
@@ -135,8 +157,8 @@ object FS2Channel {
135157 }
136158 }
137159 protected def reportError (params : Option [Payload ], error : ProtocolError , method : String ): F [Unit ] = ???
138- protected def getEndpoint (method : String ): F [Option [Endpoint [F ]]] = state.get.map(_.endpoints.get (method))
139- protected def sendMessage (message : Message ): F [Unit ] = queue.offer(Codec .encode( message) )
160+ protected def getEndpoint (method : String ): F [Option [Endpoint [F ]]] = state.get.map(_.getEndpoint (method))
161+ protected def sendMessage (message : Message ): F [Unit ] = queue.offer(message)
140162
141163 protected def nextCallId (): F [CallId ] = state.modify(_.nextCallId)
142164 protected def createPromise [A ](callId : CallId ): F [(Try [A ] => F [Unit ], () => F [A ])] = Deferred [F , Try [A ]].map {
0 commit comments