NAME
"Net::Async::CassandraCQL" - use Cassandra databases with IO::Async
using CQL
SYNOPSIS
use IO::Async::Loop;
use Net::Async::CassandraCQL;
use Protocol::CassandraCQL qw( CONSISTENCY_QUORUM );
my $loop = IO::Async::Loop->new;
my $cass = Net::Async::CassandraCQL->new(
host => "localhost",
keyspace => "my_keyspace",
default_consistency => CONSISTENCY_QUORUM,
);
$loop->add( $cass );
$cass->connect->get;
my @f;
foreach my $number ( 1 .. 100 ) {
push @f, $cass->query( "INSERT INTO numbers (v) VALUES ($number)" );
}
Future->needs_all( @f )->get;
my $get_stmt = $cass->prepare( "SELECT v FROM numbers" )->get;
my ( undef, $result ) = $get_stmt->execute( [] )->get;
foreach my $row ( $result->rows_hash ) {
say "We have a number " . $row->{v};
}
DESCRIPTION
This module allows use of the "CQL3" interface of a Cassandra database.
It fully supports asynchronous operation via IO::Async, allowing both
direct queries and prepared statements to be managed concurrently, if
required. Alternatively, as the interface is entirely based on Future
objects, it can be operated synchronously in a blocking fashion by
simply awaiting each individual operation by calling the "get" method.
It is based on Protocol::CassandraCQL, which more completely documents
the behaviours and limits of its ability to communicate with Cassandra.
EVENTS
on_node_up $nodeid
on_node_down $nodeid
The node's status has changed. $nodeid is the node's IP address as a
text string.
on_node_new $nodeid
on_node_removed $nodeid
A new node has been added to the cluster, or an existing node has been
decommissioned and removed.
These four events are obtained from event watches on the actual node
connections and filtered to remove duplicates. The use of multiple
primaries should improve the reliability of notifications, though if
multiple nodes fail at or around the same time this may go unreported,
as no node will ever report its own failure.
PARAMETERS
The following named parameters may be passed to "new" or "configure":
host => STRING
hosts => ARRAY of STRING
The hostnames of Cassandra node to connect to initially. If more
than one host is provided in an array, they will be attempted
sequentially until one succeeds during the intial connect phase.
If a host contains :$service, then the service parameter will be
overidden for this host.
service => STRING
Optional. The service name or port number to connect to.
username => STRING
password => STRING
Optional. Authentication details to use for
"PasswordAuthenticator".
keyspace => STRING
Optional. If set, a "USE keyspace" query will be issued as part
of the connect method.
default_consistency => INT
Optional. Default consistency level to use if none is provided
to "query" or "execute".
primaries => INT
Optional. The number of primary node connections to maintain.
Defaults to 1 if not specified.
prefer_dc => STRING
Optional. If set, prefer to pick primary nodes from the given
data center, only falling back on others if there are not enough
available.
cql_version => INT
Optional. Version of the CQL wire protocol to negotiate during
connection. Defaults to 1.
METHODS
$str = $cass->quote( $str )
Quotes a string argument suitable for inclusion in an immediate CQL
query string.
In general, it is better to use a prepared query and pass the value as
an execute parameter though.
$str = $cass->quote_identifier( $str )
Quotes an identifier name suitable for inclusion in a CQL query string.
$cass->connect( %args ) ==> ()
Connects to the Cassandra node and starts up the connection. The
returned Future will yield nothing on success.
Takes the following named arguments:
host => STRING
hosts => ARRAY of STRING
service => STRING
A set of host names are required, either as a named argument or as a
configured value on the object. If the service name is missing, the
default CQL port will be used instead. If a host contains a :$service,
that overrides both the service passed in, and the service on this
object.
$cass->close_when_idle ==> $cass
Stops accepting new queries and prepares all the existing connections to
be closed once every outstanding query has been responded to. Returns a
future that will eventually yield the CassandraCQL object, when all the
connections are closed.
After calling this method it will be an error to invoke "query",
"prepare", "execute" or the various other methods derived from them.
$cass->close_now
Immediately closes all node connections and shuts down the object. Any
outstanding or queued queries will immediately fail. Consider this as a
"last resort" failure shutdown, as compared to the graceful draining
behaviour of "close_when_idle".
$cass->query( $cql, $consistency, %other_args ) ==> ( $type, $result )
Performs a CQL query. On success, the values returned from the Future
will depend on the type of query.
For "USE" queries, the type is "keyspace" and $result is a string giving
the name of the new keyspace.
For "CREATE", "ALTER" and "DROP" queries, the type is "schema_change"
and $result is a 3-element ARRAY reference containing the type of
change, the keyspace and the table name.
For "SELECT" queries, the type is "rows" and $result is an instance of
Protocol::CassandraCQL::Result containing the returned row data.
For other queries, such as "INSERT", "UPDATE" and "DELETE", the future
returns nothing.
%other_args may be any of the following, when using "cql_version" 2 or
above:
skip_metadata => BOOL
Requests the server does not include result metadata in the
response. It will be up to the caller to provide this, via
"set_metadata" on the returned Result object, before it can be
used.
page_size => INT
Requests that the server returns at most the given number of
rows. If any further remain, the result object will include the
"paging_state" field. This can be passed in another "query" call
to obtain the next set of data.
paging_state => INT
Requests that the server continues a paged request from this
position, given in a previous response.
serial_consistency => INT
Sets the consistency level for serial operations in the query.
Must be one of "CONSISTENCY_SERIAL" or
"CONSISTENCY_LOCAL_SERIAL".
$cass->query_rows( $cql, $consistency, %other_args ) ==> $result
A shortcut wrapper for "query" which expects a "rows" result and returns
it directly. Any other result is treated as an error. The returned
Future returns a "Protocol::CassandraCQL::Result" directly
$cass->prepare( $cql ) ==> $query
Prepares a CQL query for later execution. On success, the returned
Future yields an instance of a prepared query object (see below).
Query objects stored internally cached by the CQL string; subsequent
calls to "prepare" with the same exact CQL string will yield the same
object immediately, saving a roundtrip.
$cass->execute( $query, $data, $consistency, %other_args ) ==> ( $type, $result )
Executes a previously-prepared statement, given the binding data. On
success, the returned Future will yield results of the same form as the
"query" method. $data should contain a list of encoded byte-string
values.
Normally this method is not directly required - instead, use the
"execute" method on the query object itself, as this will encode the
parameters correctly.
%other_args may be as for the "query" method.
CONVENIENT WRAPPERS
The following wrapper methods all wrap the basic "query" operation.
$cass->schema_keyspaces ==> $result
A shortcut to a "SELECT" query on "system.schema_keyspaces", which
returns a result object listing all the keyspaces.
Exact details of the returned columns will depend on the Cassandra
version, but the result should at least be keyed by the first column,
called "keyspace_name".
my $keyspaces = $result->rowmap_hash( "keyspace_name" )
$cass->schema_columnfamilies( $keyspace ) ==> $result
A shortcut to a "SELECT" query on "system.schema_columnfamilies", which
returns a result object listing all the columnfamilies of the given
keyspace.
Exact details of the returned columns will depend on the Cassandra
version, but the result should at least be keyed by the first column,
called "columnfamily_name".
my $columnfamilies = $result->rowmap_hash( "columnfamily_name" )
$cass->schema_columns( $keyspace, $columnfamily ) ==> $result
A shortcut to a "SELECT" query on "system.schema_columns", which returns
a result object listing all the columns of the given columnfamily.
Exact details of the returned columns will depend on the Cassandra
version, but the result should at least be keyed by the first column,
called "column_name".
my $columns = $result->rowmap_hash( "column_name" )
TODO
* Allow other load-balancing strategies than roundrobin.
* Adjust connected primary nodes when changing "primaries"
parameter.
* Allow backup nodes, for faster connection failover.
* Support LZ4 compression when using CQL version 2.
This is blocked on RT #92825
SPONSORS
This code was paid for by
* Perceptyx
* Shadowcat Systems
AUTHOR
Paul Evans