Source code for sprockets.clients.cassandra

"""
clients.cassandra
=================

Base functionality for accessing/modifying data in Cassandra.

"""
import os
import socket

from cassandra.cluster import Cluster
from tornado.concurrent import Future
from tornado.ioloop import IOLoop

try:
    from urllib.parse import urlsplit
except:
    from urlparse import urlsplit

version_info = (0, 1, 0)
__version__ = '.'.join(str(v) for v in version_info)

DEFAULT_URI = 'cassandra://localhost:9042'
DEFAULT_PORT = 9042


[docs]class CassandraConnection(object): """Maintain a connection to a Cassandra cluster. The Sprockets Cassandra client handles provides the glue needed to join the Tornado async I/O module with the native python async I/O used in the Cassandra driver. The constructor of the function will grab the current handle to the underlying Tornado I/O loop so that a Tornado future result can be returned to the host application. Configuration parameters for the module are obtained from environment variables. Currently, the only variable is ``CASSANDRA_URI``, which takes the format "cassandra://hostname". If not located, the hostname defaults to localhost. .. note:: The hostname in the ``CASSANDRA_URI`` will be resolved to a list of IP addresses that will be passed to the Cassandra driver as the contact points. """ _prepared_statement_cache = {} def __init__(self, ioloop=None): self._config = self._get_cassandra_config() self._cluster = Cluster(contact_points=self._config['contact_points'], port=self._config['port']) self._session = self._cluster.connect() self._ioloop = IOLoop.current() def _get_cassandra_config(self): """Retrieve a dict containing Cassandra client config params.""" parts = urlsplit(os.environ.get('CASSANDRA_URI', DEFAULT_URI)) if parts.scheme != 'cassandra': raise RuntimeError( 'CASSANDRA_URI scheme is not "cassandra://"!') _, _, ip_addresses = socket.gethostbyname_ex(parts.hostname) if not ip_addresses: raise RuntimeError('Unable to find Cassandra in DNS!') return { 'contact_points': ip_addresses, 'port': parts.port or DEFAULT_PORT, }
[docs] def set_keyspace(self, keyspace): """Set the keyspace used by the connection.""" self._session.set_keyspace(keyspace)
[docs] def shutdown(self): """Shutdown the connection to the Cassandra cluster.""" self._cluster.shutdown() self._session = None self._cluster = None
[docs] def prepare(self, query, name=None): """Create and cache a prepared statement using the provided query. This function will take a ``query`` and optional ``name`` parameter and will create a new prepared statement for the provided ``query``. The resulting statement object will be cached so future invocations of this function will not incur the overhead or recreating the statement. If ``name`` is provided it will be used as the key for the cache, so you'll be able to call ``execute`` using the name. :pram str query: The query to prepare. :pram str name: (Optional) name to use as a key in the cache. """ key = name or query stmt = CassandraConnection._prepared_statement_cache.get(key, None) if stmt is not None: return stmt stmt = self._session.prepare(query) CassandraConnection._prepared_statement_cache[key] = stmt return stmt
[docs] def execute(self, query, *args, **kwargs): """Asynchronously execute the specified CQL query. The execute command also takes optional parameters and trace keyword arguments. See cassandra-python documentation for definition of those parameters. """ tornado_future = Future() cassandra_future = self._session.execute_async(query, *args, **kwargs) self._ioloop.add_callback( self._callback, cassandra_future, tornado_future) return tornado_future
def _callback(self, cassandra_future, tornado_future): """Cassandra async I/O loop callback handler.""" try: result = cassandra_future.result() except Exception as exc: return tornado_future.set_exception(exc) tornado_future.set_result(result)