logo

Lucas Katayama/Server-sent events

Created Sun, 02 Aug 2020 22:41:58 -0300 Modified Sun, 02 Aug 2020 22:41:58 -0300
909 Words

Server-sent events

From Wikipedia

Server-Sent Events (SSE) is a server push technology enabling a client to receive automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5[1] by the W3C.

Server-sent event API with Gin

The simplest SSE handler should look like this:

package main

import (
	"io"
	"log"
	"time"

	"github.com/gin-gonic/gin"
)

func StreamHandler(c * gin.Context) {
	c.Writer.Header().Set("Content-Type", "text/event-stream")
	c.Writer.Header().Set("Cache-Control", "no-cache")

	gone := c.Stream(func(w io.Writer) bool {
		c.Writer.Write([]byte(time.Now().Format(time.RFC3339Nano) + "\n"))
		c.Writer.Flush()
	})
	if gone {
		// do something after client is gone
		log.Println("client is gone")
	}
}

func main() {
	r := gin.New()

	r.GET("/stream", StreamHandler)

	r.Run("localhost:5000")
}

The gin helper c.Stream(step func(w io.Writer) bool) creates an inifite loop until the client disconnects. For each loop, the step function is called.

In that example, we write to the response the current time.RFC3339Nano to the client.

When client disconnects, the c.Stream returns true.

This is a pretty simple case. A more complex one is if you want to retrieve information from somewhere else and send it to the client.

You just need to rewrite the step function:

gone := c.Stream(func(w io.Writer) bool {
    // retrieve information
    data := getInfomation()

    // send it to the client, notice the \n
    w.Write([]byte(data + "\n"))
    return true
})

This is an active model on the server side. Meaning, the server has to get the information.

graph LR
	subgraph broker
	Data1/Data2/.../DataN --> Broker
	end
	Server --get--> Broker
	
	Server --> Client

But what if you want a passive model, when the server receives and send it to the client?

graph LR
	subgraph broker
	Data1/Data2/.../DataN --> Broker
	end
	Broker --push--> Server
	
	Server --> Client

Understading Gin helper

The underlying code of c.Stream is :

func (c *Context) Stream(step func(w io.Writer) bool) bool {
	w := c.Writer
	clientGone := w.CloseNotify()
	for {
		select {
		case <-clientGone:
			return true
		default:
			keepOpen := step(w)
			w.Flush()
			if !keepOpen {
				return false
			}
		}
	}
}

It has 3 important parts:

  1. A client close listener
  2. A for loop maintaining the connection opened
  3. The response flush

Let’s imagine that you have a message broker and you want to send every message to the client.

To use the gin helper you need to change your code to listen to a channel of messages. Then make your broker send messages to that channel.

This could end up with a complex code with a lots of stuff to handle, like:

  • Channel closing
  • Deadlocks
  • Complex tests

But, if you use the same principles from the helper, you can achieve a simpler code.

Passive model

So you need 3 important parts:

  1. A client close listener
  2. Something to maintain the connection opened
  3. The response flush

You will end up with somthing like this:


func StreamHandler(c * gin.Context) {
	c.Writer.Header().Set("Content-Type", "text/event-stream")
	c.Writer.Header().Set("Cache-Control", "no-cache")

	Subscribe(func(msg []byte){
		c.Writer.Write(msg)
		c.Writer.Write([]byte("\n"))
		c.Writer.Flush()
	})
	c.Writer.Flush()
	<-c.Writer.CloseNotify()
	// do something after client is gone
	log.Println("client is gone")
}

func main() {
	r := gin.New()

	r.GET("/stream", StreamHandler)

	r.Run("localhost:5000")
}

Close listeners and connection holder

The c.Writer.CloseNotify() listen to the client connection and if closes, a bool is sent to the channel.

Using the following statement <-c.Writer.CloseNotify() with <- makes the current goroutine to wait for the message, holding the connection opened.

Passive message listener

The piece of code

Broker.Subscribe(func(msg []byte){
	c.Writer.Write(msg)
	c.Writer.Write([]byte("\n"))
	c.Writer.Flush()
})

Simulates a listener to a message broker, waiting for a new message. When it arrives the func (msg []byte)callback is called and the message is written to the response.

c.Writer.Flush() flushed to the client.

Another c.Writer.Flush()?

The second flush, following the broker listener is responsible to flush the headers to the client (notice that no writer.Write is called until that time). This makes an ack to the client notifying the connection was successful.

Without that flush you will see this:

 http --stream :5000/stream
>

Instead of this:

 http --stream :5000/stream
HTTP/1.1 200 OK
Cache-Control: no-cache
Content-Type: text/event-stream
Date: Mon, 03 Aug 2020 02:40:09 GMT
Transfer-Encoding: chunked
>

Until the next Flush.

Testing

A simple API test will look like this:

func TestStream(t *testing.T) {
	res := httptest.NewRecorder()
	req := httptest.NewRequest(http.MethodGet, "/stream", nil)


	r := gin.New()
	r.Handle(http.MethodGet, "/stream", StreamHandler)

	r.ServeHTTP(res, req)
	
	assert.Equal(t, res.Body.String(), "msg")
}

This wont work.

Because a server-sent API maintain an opened connection. So the test stucks on r.ServeHTTP(res, req).

And you can’t simulate a client close connection.

Server-sent events testing

To test server-sent events API you need some extras. See the code below:


type StreamRecorder struct {
	*httptest.ResponseRecorder
	closeNotify chan bool
}

func (s *StreamRecorder) CloseNotify() <-chan bool {
	return s.closeNotify 
}

func (s *StreamRecorder) close() {
	s.closeNotify <- true
}

func NewStreamRecorder() *StreamRecorder{
	return &StreamRecorder{
		httptest.NewRecorder(),
		make(chan bool),
	}
}

func TestStream(t *testing.T) {
	res := NewStreamRecorder()
	req := httptest.NewRequest(http.MethodGet, "/stream", nil)

	r := gin.New()
	r.Handle(http.MethodGet, "/stream", StreamHandler)

	go r.ServeHTTP(res, req)

	// do notifications here

	//Wait flushes
	for !res.Flushed {}

	// then close
	res.close()
	assert.Equal(t, res.Body.String(), "msg")
}

Custom ResponseRecorder

To close connection you need a custom response recorder that handles the CloseNotify channel:

type StreamRecorder struct {
	*httptest.ResponseRecorder
	closeNotify chan bool
}

func (s *StreamRecorder) CloseNotify() <-chan bool {
	return s.closeNotify 
}

func (s *StreamRecorder) close() {
	s.closeNotify <- true
}

func NewStreamRecorder() *StreamRecorder{
	return &StreamRecorder{
		httptest.NewRecorder(),
		make(chan bool),
	}
}

Now you can close connection:

res.close()

Goroutine

You need a goroutine to run the request in background. That way you won’t get stuck.

go r.ServeHTTP(res, req)

Flushes

You need to wait flushes to the ResponseRecorder

for !res.Flushed {}

Done

So there it is. Simple.

References