Server-sent events
From Wikipedia
Server-Sent Events (SSE) is a server push technology enabling a client to receive automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5[1] by the W3C.
Server-sent event API with Gin
The simplest SSE handler should look like this:
package main
import (
"io"
"log"
"time"
"github.com/gin-gonic/gin"
)
func StreamHandler(c * gin.Context) {
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
gone := c.Stream(func(w io.Writer) bool {
c.Writer.Write([]byte(time.Now().Format(time.RFC3339Nano) + "\n"))
c.Writer.Flush()
})
if gone {
// do something after client is gone
log.Println("client is gone")
}
}
func main() {
r := gin.New()
r.GET("/stream", StreamHandler)
r.Run("localhost:5000")
}
The gin
helper c.Stream(step func(w io.Writer) bool)
creates an inifite loop until the client disconnects. For each loop, the step function is called.
In that example, we write to the response the current time.RFC3339Nano
to the client.
When client disconnects, the c.Stream
returns true
.
This is a pretty simple case. A more complex one is if you want to retrieve information from somewhere else and send it to the client.
You just need to rewrite the step function:
gone := c.Stream(func(w io.Writer) bool {
// retrieve information
data := getInfomation()
// send it to the client, notice the \n
w.Write([]byte(data + "\n"))
return true
})
This is an active model on the server side. Meaning, the server has to get the information.
graph LR subgraph broker Data1/Data2/.../DataN --> Broker end Server --get--> Broker Server --> Client
But what if you want a passive model, when the server receives and send it to the client?
graph LR subgraph broker Data1/Data2/.../DataN --> Broker end Broker --push--> Server Server --> Client
Understading Gin helper
The underlying code of c.Stream
is :
func (c *Context) Stream(step func(w io.Writer) bool) bool {
w := c.Writer
clientGone := w.CloseNotify()
for {
select {
case <-clientGone:
return true
default:
keepOpen := step(w)
w.Flush()
if !keepOpen {
return false
}
}
}
}
It has 3 important parts:
- A client close listener
- A for loop maintaining the connection opened
- The response flush
Let’s imagine that you have a message broker and you want to send every message to the client.
To use the gin helper you need to change your code to listen to a channel of messages. Then make your broker send messages to that channel.
This could end up with a complex code with a lots of stuff to handle, like:
- Channel closing
- Deadlocks
- Complex tests
But, if you use the same principles from the helper, you can achieve a simpler code.
Passive model
So you need 3 important parts:
- A client close listener
- Something to maintain the connection opened
- The response flush
You will end up with somthing like this:
func StreamHandler(c * gin.Context) {
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
Subscribe(func(msg []byte){
c.Writer.Write(msg)
c.Writer.Write([]byte("\n"))
c.Writer.Flush()
})
c.Writer.Flush()
<-c.Writer.CloseNotify()
// do something after client is gone
log.Println("client is gone")
}
func main() {
r := gin.New()
r.GET("/stream", StreamHandler)
r.Run("localhost:5000")
}
Close listeners and connection holder
The c.Writer.CloseNotify()
listen to the client connection and if closes, a bool
is sent to the channel.
Using the following statement <-c.Writer.CloseNotify()
with <-
makes the current goroutine to wait for the message,
holding the connection opened.
Passive message listener
The piece of code
Broker.Subscribe(func(msg []byte){
c.Writer.Write(msg)
c.Writer.Write([]byte("\n"))
c.Writer.Flush()
})
Simulates a listener to a message broker, waiting for a new message. When it arrives the func (msg []byte)
callback is called and the message is written to the response.
c.Writer.Flush()
flushed to the client.
Another c.Writer.Flush()?
The second flush, following the broker listener is responsible to flush the headers to the client (notice that no writer.Write
is called until that time).
This makes an ack
to the client notifying the connection was successful.
Without that flush you will see this:
❯ http --stream :5000/stream
>
Instead of this:
❯ http --stream :5000/stream
HTTP/1.1 200 OK
Cache-Control: no-cache
Content-Type: text/event-stream
Date: Mon, 03 Aug 2020 02:40:09 GMT
Transfer-Encoding: chunked
>
Until the next Flush
.
Testing
A simple API test will look like this:
func TestStream(t *testing.T) {
res := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/stream", nil)
r := gin.New()
r.Handle(http.MethodGet, "/stream", StreamHandler)
r.ServeHTTP(res, req)
assert.Equal(t, res.Body.String(), "msg")
}
This wont work.
Because a server-sent API maintain an opened connection. So the test stucks on r.ServeHTTP(res, req)
.
And you can’t simulate a client close connection.
Server-sent events testing
To test server-sent events API you need some extras. See the code below:
type StreamRecorder struct {
*httptest.ResponseRecorder
closeNotify chan bool
}
func (s *StreamRecorder) CloseNotify() <-chan bool {
return s.closeNotify
}
func (s *StreamRecorder) close() {
s.closeNotify <- true
}
func NewStreamRecorder() *StreamRecorder{
return &StreamRecorder{
httptest.NewRecorder(),
make(chan bool),
}
}
func TestStream(t *testing.T) {
res := NewStreamRecorder()
req := httptest.NewRequest(http.MethodGet, "/stream", nil)
r := gin.New()
r.Handle(http.MethodGet, "/stream", StreamHandler)
go r.ServeHTTP(res, req)
// do notifications here
//Wait flushes
for !res.Flushed {}
// then close
res.close()
assert.Equal(t, res.Body.String(), "msg")
}
Custom ResponseRecorder
To close connection you need a custom response recorder that handles the CloseNotify
channel:
type StreamRecorder struct {
*httptest.ResponseRecorder
closeNotify chan bool
}
func (s *StreamRecorder) CloseNotify() <-chan bool {
return s.closeNotify
}
func (s *StreamRecorder) close() {
s.closeNotify <- true
}
func NewStreamRecorder() *StreamRecorder{
return &StreamRecorder{
httptest.NewRecorder(),
make(chan bool),
}
}
Now you can close connection:
res.close()
Goroutine
You need a goroutine to run the request in background. That way you won’t get stuck.
go r.ServeHTTP(res, req)
Flushes
You need to wait flushes to the ResponseRecorder
for !res.Flushed {}
Done
So there it is. Simple.