blob: c8a9c59b26fd6b1c4caa9e85e2de1e33cdf0e6ca [file] [log] [blame]
use std::iter::Iterator;
use crate::cmd::{Arg, Cmd};
use crate::commands::is_readonly_cmd;
use crate::types::Value;
pub(crate) const SLOT_SIZE: u16 = 16384;
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) enum RoutingInfo {
AllNodes,
AllMasters,
Random,
MasterSlot(u16),
ReplicaSlot(u16),
}
impl RoutingInfo {
pub(crate) fn for_routable<R>(r: &R) -> Option<RoutingInfo>
where
R: Routable + ?Sized,
{
let cmd = &r.command()?[..];
match cmd {
b"FLUSHALL" | b"FLUSHDB" | b"SCRIPT" => Some(RoutingInfo::AllMasters),
b"ECHO" | b"CONFIG" | b"CLIENT" | b"SLOWLOG" | b"DBSIZE" | b"LASTSAVE" | b"PING"
| b"INFO" | b"BGREWRITEAOF" | b"BGSAVE" | b"CLIENT LIST" | b"SAVE" | b"TIME"
| b"KEYS" => Some(RoutingInfo::AllNodes),
b"SCAN" | b"CLIENT SETNAME" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF"
| b"SCRIPT KILL" | b"MOVE" | b"BITOP" => None,
b"EVALSHA" | b"EVAL" => {
let key_count = r
.arg_idx(2)
.and_then(|x| std::str::from_utf8(x).ok())
.and_then(|x| x.parse::<u64>().ok())?;
if key_count == 0 {
Some(RoutingInfo::Random)
} else {
r.arg_idx(3).and_then(|key| RoutingInfo::for_key(cmd, key))
}
}
b"XGROUP" | b"XINFO" => r.arg_idx(2).and_then(|key| RoutingInfo::for_key(cmd, key)),
b"XREAD" | b"XREADGROUP" => {
let streams_position = r.position(b"STREAMS")?;
r.arg_idx(streams_position + 1)
.and_then(|key| RoutingInfo::for_key(cmd, key))
}
_ => match r.arg_idx(1) {
Some(key) => RoutingInfo::for_key(cmd, key),
None => Some(RoutingInfo::Random),
},
}
}
pub fn for_key(cmd: &[u8], key: &[u8]) -> Option<RoutingInfo> {
let key = match get_hashtag(key) {
Some(tag) => tag,
None => key,
};
let slot = crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE;
if is_readonly_cmd(cmd) {
Some(RoutingInfo::ReplicaSlot(slot))
} else {
Some(RoutingInfo::MasterSlot(slot))
}
}
}
pub(crate) trait Routable {
// Convenience function to return ascii uppercase version of the
// the first argument (i.e., the command).
fn command(&self) -> Option<Vec<u8>> {
self.arg_idx(0).map(|x| x.to_ascii_uppercase())
}
// Returns a reference to the data for the argument at `idx`.
fn arg_idx(&self, idx: usize) -> Option<&[u8]>;
// Returns index of argument that matches `candidate`, if it exists
fn position(&self, candidate: &[u8]) -> Option<usize>;
}
impl Routable for Cmd {
fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
self.arg_idx(idx)
}
fn position(&self, candidate: &[u8]) -> Option<usize> {
self.args_iter().position(|a| match a {
Arg::Simple(d) => d.eq_ignore_ascii_case(candidate),
_ => false,
})
}
}
impl Routable for Value {
fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
match self {
Value::Bulk(args) => match args.get(idx) {
Some(Value::Data(ref data)) => Some(&data[..]),
_ => None,
},
_ => None,
}
}
fn position(&self, candidate: &[u8]) -> Option<usize> {
match self {
Value::Bulk(args) => args.iter().position(|a| match a {
Value::Data(d) => d.eq_ignore_ascii_case(candidate),
_ => false,
}),
_ => None,
}
}
}
#[derive(Debug)]
pub(crate) struct Slot {
start: u16,
end: u16,
master: String,
replicas: Vec<String>,
}
impl Slot {
pub fn new(s: u16, e: u16, m: String, r: Vec<String>) -> Self {
Self {
start: s,
end: e,
master: m,
replicas: r,
}
}
pub fn start(&self) -> u16 {
self.start
}
pub fn end(&self) -> u16 {
self.end
}
pub fn master(&self) -> &str {
&self.master
}
pub fn replicas(&self) -> &Vec<String> {
&self.replicas
}
}
fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
let open = key.iter().position(|v| *v == b'{');
let open = match open {
Some(open) => open,
None => return None,
};
let close = key[open..].iter().position(|v| *v == b'}');
let close = match close {
Some(close) => close,
None => return None,
};
let rv = &key[open + 1..open + close];
if rv.is_empty() {
None
} else {
Some(rv)
}
}
#[cfg(test)]
mod tests {
use super::{get_hashtag, RoutingInfo};
use crate::{cmd, parser::parse_redis_value};
#[test]
fn test_get_hashtag() {
assert_eq!(get_hashtag(&b"foo{bar}baz"[..]), Some(&b"bar"[..]));
assert_eq!(get_hashtag(&b"foo{}{baz}"[..]), None);
assert_eq!(get_hashtag(&b"foo{{bar}}zap"[..]), Some(&b"{bar"[..]));
}
#[test]
fn test_routing_info_mixed_capatalization() {
let mut upper = cmd("XREAD");
upper.arg("STREAMS").arg("foo").arg(0);
let mut lower = cmd("xread");
lower.arg("streams").arg("foo").arg(0);
assert_eq!(
RoutingInfo::for_routable(&upper).unwrap(),
RoutingInfo::for_routable(&lower).unwrap()
);
let mut mixed = cmd("xReAd");
mixed.arg("StReAmS").arg("foo").arg(0);
assert_eq!(
RoutingInfo::for_routable(&lower).unwrap(),
RoutingInfo::for_routable(&mixed).unwrap()
);
}
#[test]
fn test_routing_info() {
let mut test_cmds = vec![];
// RoutingInfo::AllMasters
let mut test_cmd = cmd("FLUSHALL");
test_cmd.arg("");
test_cmds.push(test_cmd);
// RoutingInfo::AllNodes
test_cmd = cmd("ECHO");
test_cmd.arg("");
test_cmds.push(test_cmd);
// Routing key is 2nd arg ("42")
test_cmd = cmd("SET");
test_cmd.arg("42");
test_cmds.push(test_cmd);
// Routing key is 3rd arg ("FOOBAR")
test_cmd = cmd("XINFO");
test_cmd.arg("GROUPS").arg("FOOBAR");
test_cmds.push(test_cmd);
// Routing key is 3rd or 4th arg (3rd = "0" == RoutingInfo::Random)
test_cmd = cmd("EVAL");
test_cmd.arg("FOO").arg("0").arg("BAR");
test_cmds.push(test_cmd);
// Routing key is 3rd or 4th arg (3rd != "0" == RoutingInfo::Slot)
test_cmd = cmd("EVAL");
test_cmd.arg("FOO").arg("4").arg("BAR");
test_cmds.push(test_cmd);
// Routing key position is variable, 3rd arg
test_cmd = cmd("XREAD");
test_cmd.arg("STREAMS").arg("4");
test_cmds.push(test_cmd);
// Routing key position is variable, 4th arg
test_cmd = cmd("XREAD");
test_cmd.arg("FOO").arg("STREAMS").arg("4");
test_cmds.push(test_cmd);
for cmd in test_cmds {
let value = parse_redis_value(&cmd.get_packed_command()).unwrap();
assert_eq!(
RoutingInfo::for_routable(&value).unwrap(),
RoutingInfo::for_routable(&cmd).unwrap(),
);
}
}
}