示例程序演示如下内容:

  • 使用 channels 开发聊天室 (a pub-sub model).
  • 使用 Comet 和 Websockets

程序结构如下

chat/app/
	chatroom	       # 聊天室协程
		chatroom.go

	controllers
		app.go         # 欢迎界面, 允许用户选择一种方式
		refresh.go     # "主动刷新" 方式
		longpolling.go # "长连接" ("Comet") 方式
		websocket.go   # "Websocket" 方式

	views
		...            # HTML 和 Javascript

浏览代码

聊天室

首先, 看看如何开发一个聊天室, in chatroom.go.

聊天室作为独立的协程运行, 初始化时候启动:

func init() {
	go chatroom()
}

chatroom() 函数简单的选择频道之后,开始处理所有的操作。

var (
	// Send a channel here to get room events back.  It will send the entire
	// archive initially, and then new messages as they come in.
	subscribe = make(chan (chan<- Subscription), 10)
	// Send a channel here to unsubscribe.
	unsubscribe = make(chan (<-chan Event), 10)
	// Send events here to publish them.
	publish = make(chan Event, 10)
)

func chatroom() {
	archive := list.New()
	subscribers := list.New()

	for {
		select {
		case ch := <-subscribe:
			// Add subscriber to list and send back subscriber channel + chat log.
		case event := <-publish:
			// Send event to all subscribers and add to chat log.
		case unsub := <-unsubscribe:
			// Remove subscriber from subscriber list.
		}
	}
}

Let’s see how each of those is implemented.

Subscribe

	case ch := <-subscribe:
		var events []Event
		for e := archive.Front(); e != nil; e = e.Next() {
			events = append(events, e.Value.(Event))
		}
		subscriber := make(chan Event, 10)
		subscribers.PushBack(subscriber)
		ch <- Subscription{events, subscriber}

A Subscription is created with two properties:

  • The chat log (archive)
  • A channel that the subscriber can listen on to get new messages.

The Subscription is then sent back over the channel that the subscriber supplied.

Publish

	case event := <-publish:
		for ch := subscribers.Front(); ch != nil; ch = ch.Next() {
			ch.Value.(chan Event) <- event
		}
		if archive.Len() >= archiveSize {
			archive.Remove(archive.Front())
		}
		archive.PushBack(event)

The published event is sent to the subscribers’ channels one by one. Then the event is added to the archive, which is trimmed if necessary.

Unsubscribe

	case unsub := <-unsubscribe:
		for ch := subscribers.Front(); ch != nil; ch = ch.Next() {
			if ch.Value.(chan Event) == unsub {
				subscribers.Remove(ch)
			}
		}

The subscriber channel is removed from the list.

Handlers

Now that you know how the chat room works, we can look at how the handlers expose that functionality using different techniques.

Active Refresh

The Active Refresh chat room javascript refreshes the page every 5 seconds to get any new messages:

  // Scroll the messages panel to the end
  var scrollDown = function() {
    $('#thread').scrollTo('max')
  }

  // Reload the whole messages panel
  var refresh = function() {
    $('#thread').load('/refresh/room?user= #thread .message', function() {
      scrollDown()
    })
  }

  // Call refresh every 5 seconds
  setInterval(refresh, 5000)

Refresh/Room.html

This is the handler to serve that:

func (c Refresh) Room(user string) revel.Result {
	subscription := chatroom.Subscribe()
	defer subscription.Cancel()
	events := subscription.Archive
	for i, _ := range events {
		if events[i].User == user {
			events[i].User = "you"
		}
	}
	return c.Render(user, events)
}

refresh.go

It subscribes to the chatroom and passes the archive to the template to be rendered (after changing the user name to “you” as necessary).

Nothing much to see here.

Long Polling (Comet)

The Long Polling chat room javascript makes an ajax request that the server keeps open until a new message comes in. The javascript provides a lastReceived timestamp to tell the server the last message it knows about.

  var lastReceived = 0
  var waitMessages = '/longpolling/room/messages?lastReceived='
  var say = '/longpolling/room/messages?user='

  $('#send').click(function(e) {
    var message = $('#message').val()
    $('#message').val('')
    $.post(say, {message: message})
  });

  // Retrieve new messages
  var getMessages = function() {
    $.ajax({
      url: waitMessages + lastReceived,
      success: function(events) {
        $(events).each(function() {
          display(this)
          lastReceived = this.Timestamp
        })
        getMessages()
      },
      dataType: 'json'
    });
  }
  getMessages();

LongPolling/Room.html

and here is the handler

func (c LongPolling) WaitMessages(lastReceived int) revel.Result {
	subscription := chatroom.Subscribe()
	defer subscription.Cancel()

	// See if anything is new in the archive.
	var events []chatroom.Event
	for _, event := range subscription.Archive {
		if event.Timestamp > lastReceived {
			events = append(events, event)
		}
	}

	// If we found one, grand.
	if len(events) > 0 {
		return c.RenderJson(events)
	}

	// Else, wait for something new.
	event := <-subscription.New
	return c.RenderJson([]chatroom.Event{event})
}

longpolling.go

In this implementation, it can simply block on the subscription channel (assuming it has already sent back everything in the archive).

Websocket

The Websocket chat room javascript opens a websocket connection as soon as the user has loaded the chat room page.

  // Create a socket
  var socket = new WebSocket('ws://127.0.0.1:9000/websocket/room/socket?user=')

  // Message received on the socket
  socket.onmessage = function(event) {
    display(JSON.parse(event.data))
  }

  $('#send').click(function(e) {
    var message = $('#message').val()
    $('#message').val('')
    socket.send(message)
  });

WebSocket/Room.html

The first thing to do is to subscribe to new events, join the room, and send down the archive. Here is what that looks like:

func (c WebSocket) RoomSocket(user string, ws *websocket.Conn) revel.Result {
	// Join the room.
	subscription := chatroom.Subscribe()
	defer subscription.Cancel()

	chatroom.Join(user)
	defer chatroom.Leave(user)

	// Send down the archive.
	for _, event := range subscription.Archive {
		if websocket.JSON.Send(ws, &event) != nil {
			// They disconnected
			return nil
		}
	}

websocket.go

Next, we have to listen for new events from the subscription. However, the websocket library only provides a blocking call to get a new frame. To select between them, we have to wrap it:

	// In order to select between websocket messages and subscription events, we
	// need to stuff websocket events into a channel.
	newMessages := make(chan string)
	go func() {
		var msg string
		for {
			err := websocket.Message.Receive(ws, &msg)
			if err != nil {
				close(newMessages)
				return
			}
			newMessages <- msg
		}
	}()

websocket.go

Now we can select for new websocket messages on the newMessages channel.

The last bit does exactly that – it waits for a new message from the websocket (if the user has said something) or from the subscription (someone else in the chat room has said something) and propagates the message to the other.

	// Now listen for new events from either the websocket or the chatroom.
	for {
		select {
		case event := <-subscription.New:
			if websocket.JSON.Send(ws, &event) != nil {
				// They disconnected.
				return nil
			}
		case msg, ok := <-newMessages:
			// If the channel is closed, they disconnected.
			if !ok {
				return nil
			}

			// Otherwise, say something.
			chatroom.Say(user, msg)
		}
	}
	return nil
}

websocket.go

If we detect the websocket channel has closed, then we just return nil.