Emulating an Event-Based Virtualization API

For the last few months at the OSL we’ve been experimenting with a program called Ganeti. Ganeti is a system for managing a virtualization cluster. There are a lot of benefits to using Ganeti, which I’ll save for another time, but one of my favorites is that it has an HTTP API. For example (with hostnames redacted)

russell_h@skyship ~ $ curl -k https://localhost:8889/2/nodes
[
	{
		"id": "node1.example.com",
		"uri": "/2/nodes/node1.example.com"
	},
	{
		"id": "node2.example.com",
		"uri": "/2/nodes/node2.example.com"
	},
	{
		"id": "node3.example.com",
		"uri": "/2/nodes/node3.example.com"
	},
	{
		"id": "nod4.example.com",
		"uri": "/2/nodes/node4.example.com"
	}
]

Now I don’t know about anyone else, but to me that just screams “web interface”.

One thing that has always bothered me about web interfaces to backend systems like this is that the state of the system is constantly changing. In general, when you load up the interface it takes a “snapshot” of the system, and thats what you see until you (or some javascript) poll for an updated version. Compare that to a desktop application like virt-manager which is constantly updating the view, allowing you to make decisions based on the state the system is in, rather than the state it was in. For more on the topic, albeit on a somewhat different scale, I highly recommend George Reese’s recent article, Towards Event-Driven Cloud APIs.

One solution is to just poll really often. You can even cache the poll response for a few seconds to avoid hitting the backend API every time someone reloads the page. But thats still relatively inefficient, particularly in situations where there is a high likelihood that nothing has changed.

Short of coding an event based API into Ganeti (an idea I haven’t entirely discarded), the best option I could think of was to implement a sort of event-based “middleware”. The idea is that a client (web browser) can subscribe to a given backend URI. If there are no existing subscribers to that URI, the contents will be fetched, returned to the client, and cached for some given period, say 5 seconds. Once the cache has expired the backend will be polled again. The result will be compared to the previous, cached result, and if the contents have changed the updated version will be sent to every subscriber before being cached. If someone subscribes to an already “active” URI they will be served the cached contents, as well as any future changes.

The whole system being something of an experiment, I decided to implement it using Friendfeed’s Tornado and HTML5 Web Sockets.

A backend URI is represented by its cached contents and the set of callbacks provided by subscribers

class URI(object):
	def __init__(self, content={}, callbacks=[]):
		self.content = content
		self.callbacks = callbacks

These will be used by what I am uncreatively calling a “JSONPushProxy”

class JSONPushProxy(object):
	def __init__(self, baseuri, ttl):
		self.http = AsyncHTTPClient()
		self.ttl = ttl
		self.baseuri = baseuri
		self.uris = {}
		self.ioloop = tornado.ioloop.IOLoop.instance()

	def try_callback(self, callback, *args, **kwargs):
		"""
		Wraps a callback for some special error handling (borrowed from
		Tornado)
		"""
		if callback is None:
			return None
		if args or kwargs:
			callback = functools.partial(callback, *args, **kwargs)
		def wrapper(*args, **kwargs):
			try:
				return callback(*args, **kwargs)
			except Exception, e:
				# TODO: logging
				raise e
		return wrapper

	def subscribe(self, uri, callback):
		"""
		Subscribe to content for a URI relative to the Base URI of this PushProxy.
		New subscribers will receive an immediate content push followed by pushes
		of content every time the regularly scheduled check detect a change.
		"""
		# If there are no subscribers to this uri
		if not uri in self.uris:
			# Build a new URI
			new_uri = URI(callbacks=[callback])
			self.uris[uri] = new_uri
			# Do the first fetch run
			self.fetch(uri)
		# Otherwise just append a callback and send the current content
		else:
			self.uris[uri].callbacks.append(callback)
			self.alert(callback, uri, self.uris[uri].content)

	def unsubscribe(self, uri, callback):
		"""
		Unsubscribe a callback for a URI
		"""
		self.uris[uri].callbacks.remove(callback)
		if len(self.uris[uri].callbacks) == 0:
			# If no one is subscribing remove uri from the dictionary
			del self.uris[uri]

	def fetch(self, uri):
		"""
		When running, this will be called periodically to update the cache and
		push it's contents to any subscribers.
		"""
		r = HTTPRequest(self.baseuri+uri,
						prepare_curl_callback=ssl_no_verify)

		self.http.fetch(r, callback=self.try_callback(self.on_response, uri))
		# Schedule the next run if we're still going
		self.ioloop.add_timeout(time.time() + self.ttl,
								self.try_callback(self.fetch, uri))

	def on_response(self, uri, response):
		"""
		Called when an HTTP response is available
		"""
		new_content = response.body
		try:
			if new_content != self.uris[uri].content:
				self.uris[uri].content = new_content
				# Push update
				for callback in self.uris[uri].callbacks:
					self.alert(callback, uri, new_content)
		except KeyError:
			# This can happen if the URI has been disabled since the request
			# was initiated
			pass

	def alert(self, callback, uri, content):
		"""
		Call a callback
		"""
		try:
			callback(uri, content)
		except Exception, e:
			# Unsubscribe this callback
			self.unsubscribe(uri, callback)
			raise e

Calling that “rough around the edges” would be pretty generous, but it mostly works, and at least shows what I was trying to do.

For getting content to the browser, I used Tornado’s new-ish WebSocketHandler

class WebSocket(tornado.websocket.WebSocketHandler):
	def __init__(self, application, request):
		self.uri = None
		self.ganeti = application.settings.get('ganeti')
		super(WebSocket, self).__init__(application, request)

	def open(self):
		self.receive_message(self.on_message)

	def on_message(self, message):
		obj = tornado.escape.json_decode(message)
		if obj['action'] == 'subscribe':
			if self.uri:
				self.ganeti.proxy.unsubscribe(self.uri, self.gnt_callback)
			self.uri = obj['uri']
			self.ganeti.proxy.subscribe(self.uri, self.gnt_callback)
		self.receive_message(self.on_message)

	def gnt_callback(self, url, message):
		to_send = "{\"uri\": \"%s\", \"content\": %s}" % (url, message)
		self.write_message(to_send)

Note that I limited the client to being subscribed to one URI at a time.

As far as what happens client-side, that code is a total mess at the moment, but the idea is this: when the page is loaded, the browser opens a web socket to the handler above. When the user navigates, for example, to the list of Nodes, the browser sends a message over the web socket that looks something like this

'{
	"action": "subscribe",
	"uri": "/nodes"
}'

The response will specify the URI that it is from and the actual contents of the API response. From the contents I build a table that is shown to the user. If we add a node (and assuming someone is viewing the node list), within 5 seconds the JSONPushProxy should pick up on this and message every subscriber, where the table will be re-built with the new data.

There are, however, a few downsides to this approach:

First, the use of Web Sockets, which to my knowledge are only supported by Chrome at this point.

Second, there are some issues with displaying the data client-side: I want the user to be able to sort and filter the table, but most of the libraries I’ve seen for doing this aren’t designed to handle changing data. I’ve managed to get sorting working with the jQuery Tablesorter plugin, but there seems to be some performance issue that I have yet to diagnose. Even after that, usability issues remain. For example, a user might be about to click on a node when a new node is brought online, shifting the list and causing them to click the wrong one. Not the end of the world, and I’m honestly not sure how this is handled desktop applications, but its something to think about.