An Introduction to Finagle by example

It is a challenging task to build a large-scale web application, there are fundamental characteristics to take into account: for example, efficiency, safety and robustness. Finagle is a asynchronous, Netty based JVM RPC system made by Twitter which makes it easy to build high available clients and servers in Java and Scala. And it can even simplify your application architecture. Here I want to show you how powerful Finagle is.

Quickstart

Let’s first have a quick look about how to create a Finagle micro web service and a Finagle http client to consume this api.Create a sbt project and import dependencies.

libraryDependencies ++= Seq(
"com.twitter" %% "finagle-http" % "6.38.0",
"org.scalatest" %% "scalatest" % "2.2.4" % "test"
)

First, let’s define a service. Here we define a service to receive a http request and get its url parameter as Integer then return a http response by plus 10.

import com.twitter.finagle.Serviceimport
import com.twitter.util.Futureimport
import com.twitter.finagle.http

// This is a plus 10 service
class PlusTenService extends Service[http.Request, http.Response] {

  override def apply(request: http.Request): Future[http.Response] = {
  	Future {
  		val input = request.getIntParam("num")
  		val output = input + 10
  		val response = http.Response(request.version, http.Status.Ok)
        response.setContentString(output.toString)
        response
    }
  }
}

Then initiate and start our server

import com.twitter.finagle.{http, Service, Http}
import com.twitter.util.Await

object QuickLookServer {
	def main(args: Array[String]): Unit = {
    val service: Service[http.Request, http.Response] = new PlusTenService    	  val server = Http.serve(":9090", service)
    Await.ready(server)
    }
}

Last let’s define a client to consume this server.

import com.twitter.finagle.{Service, Http}
import com.twitter.finagle.http
import com.twitter.util.Await

object QuickLookClient {
	def main(args: Array[String]): Unit = {
    	//define a client
    	val client: Service[http.Request, http.Response] = Http.newService("localhost:9090")
		//define a request
		val request = http.Request(http.Method.Get, "/?num=5")
		//apply request on the client
		val response = client(request)
		// print response
		response.foreach(rep => println(rep.getContentString()))
		Await.result(response)
    }
}

If you run the two application you will see the server running on localhost:9090 and client get response 15. Simple right? As you can see our service and client are both type of Service[http.Request, http.Response] . This data type really confuse me in the beginning. I will explan what’s the differences between them.

The core of Finagle

Service

Now let’s first have a look at the core of finagle Service[-Req, +Rep] . You can find the definition in com.twitter.finagle.Service . In Finagle 6.38.0 the definition of Service is an abstract class, in previous version it was a trait

abstract class Service[-Req, +Rep] extends (Req => Future[Rep])

A service is a function that takes request of type Req, and return a response of Future of Rep. This Services type are used to represent both clients and servers. To answer my previous question, the differences between service and client is that a Finagle client “imports” a Service from the network. However, a Finagle server “exports” a Service to the network.Note: the Future here is twitter future not scala future. There is no differences on conception.

Filter

Some times we want to add application-agnostic behaviour, we can use Filter to achieve this.

abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
extends ((ReqIn, Service[ReqOut, RepIn])
=> Future[RepOut])
4694105fa7ee7fd867da04057d2fae977d89cdbc yue1.png

If it is not clear please check image below.

458f8af9cbc1cacaddc01c6cbe80b347cb05de28 yue2.png

In most common cases, ReqIn is equal to ReqOut, and RepIn is equal to RepOut. So we have this SimpleFilter class

abstract class SimpleFilter[Req, Rep] extends Filter[Req, Rep, Req, Rep]

A filter can attached to client and server side. Let’s try to implement a simple timeout filter.

import com.twitter.finagle.{SimpleFilter, Service}
import com.twitter.util.{Duration, Timer, Future}

class TimeoutFilter[Req, Rep](timeout: Duration, timer: Timer)
extends SimpleFilter[Req, Rep] {

  def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
  	val res = service(request)
  	res.within(timer, timeout)
  }
}

Here, a timeout filter is a class extends SimpleFilter trait. Below is how to use this filter on client side

client = Http.newService("localhost:9090")

val timeoutFilter = new TimeoutFilter[http.Request, http.Response](Duration.fromSeconds(1),
new JavaTimer(false))

val clientWithTimeoutFilter = timeoutFilter.andThen(quickLookClient)

A filter can be applied on server side too. Here is an example. First let’s define a filter.

class CountFilter[Req, Rep](countClient: Service[http.Request, http.Response]) extends SimpleFilter[Req, Rep] {

	override def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
    val countRequest = http.Request(http.Method.Post, "/?count=5")
    countClient(countRequest)    service(request)
    }
  }

And then let’s use it on our plusTen service

val service: Service[http.Request, http.Response] = new PlusTenService

val countClient = Http.newService("localhost:9010")

val countFilter = new CountFilter[http.Request, http.Response](countClient)

val serviceWithCountFilter = countFilter.andThen(service)

You may notice the way to chain filter and service together is by using andThen method. Actually andThen method can not only chain filter with service but also chain multiple filters, like filter1 andThen filter2 andThen myservice

Client

This is the part that I like the most in finagle. Finagle http client is designed to maximize success and minimize latency. Each request will flow through various modules. These modules are logically separated into three stacks: Client stack, Endpoint stack, connection stack.

Client stack

manages name resolution and balances requests across multiple endpoints.

Endpoint stack

provides circuit breakers and connection pooling.

connection stack

provides connection life-cycle management and implements the wire protocol.

To use finagle http client is very simple. Define a client first and define a http request, then apply request on the client.

// create a http client
val client = Http.client.newService("example.com:80")

// create a http requestval
req = Request("/foo", ("my-query-string", "bar"))
// apply request on the client

val resp: Future[Response] = client(req)Note: client(req) is equal to client.apply(req)

What I want to emphasis here is the Load Balancer module. This module brings a lot of benefit for your application. It can simplify your application infstracture. Let’s compare it with traditional solution.

acad2a62ff0446edb420434d1a76e023c51abff9 yue3.png

As you can see, the traditional solution highly rely on nginx as load balancer, once nginx dead your service is not reachable, in real production environment, you have master-slave nginx wiht keeplived installed on nginx machine for heartbeat detection. This looks really complex, what about if we can get rid of these nginx?Let’s have look at following code.

val name: Name = Name.bound(Address("localhost", 10010), Address("localhost", 10011), Address("localhost", 10012))

//define a client
val client: Service[http.Request, http.Response] = Http.newService(name, "client")

This means you supply three addresses and put it into finagle http client. Finagle client will dispatch the request to one of address based on certain load balance algorithmn. The default algorithmn is "Exponentially Weighted Moving Average (EWMA)". Now your infstracture architechture becomes like following

aea2e38205fabfd9b748a93d987d0a6f63b18c2f yue4.png

Pretty simple right. Your apis talk to each other directly.

Protocol-agnostic

Finagle is a protocol-agnostic RPC system. It means Finagle supports every protocol if people implement it. For example: finagle-thrift is using thrift protocol. finagle-mysql implements the mysql protocol.Now, let’s look at this scenario

c296b8d67cde582e27438c12a3f22c8785509824 yue5.png

We want to make a api count service to count how many times the web service has been called. In section Service and Filter. We send http request and put number as query parameter. It just feel strange that I just want to send a number to count server, to achieve that I have to send a http request. Because I don’t use any data from header, cookie and body. If the application is running on AWS, it those junk information cost money. So it’s ideal to just send a integer number to api count service. Let’s implement this by customize finagle protocol.First, we should tell finagle how to converts an scodec codec into a Netty encoder

import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.oneone.{OneToOneDecoder,OneToOneEncoder}
import scodec.Codec
import scodec.bits.BitVector

trait CodecConversions {
/**
 * Converts an scodec codec into a Netty encoder.
 */
 protected def encoder[A: Codec] = new OneToOneEncoder {
	override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
	ChannelBuffers.wrappedBuffer(
      Codec.encodeValid(msg.asInstanceOf[A]).toByteBuffer)
 }

 /**
  * Converts an scodec codec into a Netty decoder.
  */
protected def decoder[A: Codec] = new OneToOneDecoder {
  override def decode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
  msg match {
     case cb: ChannelBuffer =>
     	Codec.decodeValidValue[A (BitVector(cb.toByteBuffer)).asInstanceOf[Object]
      case other => other
      }
  }
}

And then channel pipeline and codec factories

trait Factories {
	this: CodecConversions =>
    	import com.twitter.finagle.{Codec => FinagleCodec, CodecFactory}
        import org.jboss.netty.channel.{ChannelPipelineFactory, Channels}

  		/**
         * Creates a Netty channel pipeline factory given input and output types.   */

    private[this] def pipeline[I: Codec, O: Codec] = new ChannelPipelineFactory {
    def getPipeline = {
    	val pipeline = Channels.pipeline()
        pipeline.addLast("encoder", encoder[I])
        pipeline.addLast("decoder", decoder[O])
        pipeline
    }
  }
  /**
  * Creates a Finagle codec factory given input and output types.   */

  protected def codecFactory[I: Codec, O: Codec] = new CodecFactory[I, O] {

	def server = Function.const {
  		new FinagleCodec[I, O] { def pipelineFactory = pipeline[O, I] }
  	}

    def client = Function.const {
    	new FinagleCodec[I, O] { def pipelineFactory = pipeline[I, O] }
    }
  }
}

And then the code that actually creates our Finagle server and client

import java.net.InetSocketAddress

import com.twitter.conversions.time._
import com.twitter.finagle.Service
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
import com.twitter.util.{Duration, Future}
import scodec.Codec

object IntegerServerAndClient extends Factories with CodecConversions {

  /**
   * Creates a Finagle server from a service that we have scodec codecs
   * for both the input and output types.    */

	def server[I, O](port: Int)(service: Service[I, O])(implicit ic: Codec[I], oc: Codec[O]) =
    ServerBuilder()
  		.name("server")
  		.codec(codecFactory[I, O])
  		.bindTo(new InetSocketAddress(port))
        .build(service)

  /**
   * Creates a Finagle client given input and output types with scodec codecs.
   */

   def client[I, O](host: String, timeout: Duration = 3.second)           (implicit ic: Codec[I], oc: Codec[O]) =
   ClientBuilder()
  	.name("client")
  	.codec(codecFactory[I, O])
  	.hosts(host)
  	.timeout(timeout)
  	.build()
}

Define our simple service

import com.twitter.finagle.Service
import com.twitter.util.Future

class IntegerService extends Service[Int, Int] {
	var count = 0
    override def apply(request: Int): Future[Int] = {
    	Future.value(count + request)
    }
}

Run a server

import com.twitter.finagle.Service
import com.twitter.util.Await
import scodec.codecs.implicits.{ implicitIntCodec => _, _ }

object Server {
	def main(args: Array[String]): Unit = {
    	implicit val intgerCodec = scodec.codecs.uint8

    	val service: Service[Int, Int] = new IntegerService
    	val server = IntegerServerAndClient.server[Int, Int](9191)(service)
        Await.ready(server)
    }
}

Run a client

import com.twitter.finagle.Service
import com.twitter.util.Await
import scodec.codecs.implicits.{ implicitIntCodec => _, _ }

object Client {

	def main(args: Array[String]): Unit = {

   	 	implicit val intgerCodec = scodec.codecs.uint8

    	//define a client
    	val client: Service[Int, Int] = IntegerServerAndClient.client[Int, Int]("localhost:9191")

    	//define a request
    	val request = 4
    	//apply request on the client
    	val response = client(request)
    	//print response
    	response.foreach(rep => println(s"This is response $rep"))
    	Await.result(response)
    }
}

Conclusion

Finagle is a very flexible asychronous, protocol-agnostic RPC framework. It can help you to build high performance micro service with any protocol. It is worth to take a look at Finch the web framework based on Finagle. You can find more detail introduction from Twitter blog and more detailed example from Twitter scala school.