blob: 17a6527a7746f36f16c47f60611af426e8adcf27 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use async_trait::async_trait;
use crate::raw::adapters::kv;
use crate::Builder;
use crate::Error;
use crate::ErrorKind;
use crate::Scheme;
use crate::*;
/// Sled service support.
///
#[doc = include_str!("docs.md")]
#[derive(Default)]
pub struct SledBuilder {
/// That path to the sled data directory.
datadir: Option<String>,
root: Option<String>,
}
impl SledBuilder {
/// Set the path to the sled data directory. Will create if not exists.
pub fn datadir(&mut self, path: &str) -> &mut Self {
self.datadir = Some(path.into());
self
}
/// Set the root for sled.
pub fn root(&mut self, path: &str) -> &mut Self {
self.root = Some(path.into());
self
}
}
impl Builder for SledBuilder {
const SCHEME: Scheme = Scheme::Sled;
type Accessor = SledBackend;
fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = SledBuilder::default();
map.get("datadir").map(|v| builder.datadir(v));
map.get("root").map(|v| builder.root(v));
builder
}
fn build(&mut self) -> Result<Self::Accessor> {
let datadir_path = self.datadir.take().ok_or_else(|| {
Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
.with_context("service", Scheme::Sled)
})?;
let db = sled::open(&datadir_path).map_err(|e| {
Error::new(ErrorKind::ConfigInvalid, "open db")
.with_context("service", Scheme::Sled)
.with_context("datadir", datadir_path.clone())
.set_source(e)
})?;
Ok(SledBackend::new(Adapter {
datadir: datadir_path,
db,
})
.with_root(self.root.as_deref().unwrap_or_default()))
}
}
/// Backend for sled services.
pub type SledBackend = kv::Backend<Adapter>;
#[derive(Clone)]
pub struct Adapter {
datadir: String,
db: sled::Db,
}
impl Debug for Adapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("Adapter");
ds.field("path", &self.datadir);
ds.finish()
}
}
#[async_trait]
impl kv::Adapter for Adapter {
fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Sled,
&self.datadir,
Capability {
read: true,
write: true,
list: true,
blocking: true,
..Default::default()
},
)
}
async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
self.blocking_get(path)
}
fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
Ok(self.db.get(path).map_err(parse_error)?.map(|v| v.to_vec()))
}
async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
self.blocking_set(path, value)
}
fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
self.db.insert(path, value).map_err(parse_error)?;
Ok(())
}
async fn delete(&self, path: &str) -> Result<()> {
self.blocking_delete(path)
}
fn blocking_delete(&self, path: &str) -> Result<()> {
self.db.remove(path).map_err(parse_error)?;
Ok(())
}
async fn scan(&self, path: &str) -> Result<Vec<String>> {
self.blocking_scan(path)
}
fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
let it = self.db.scan_prefix(path).keys();
let mut res = Vec::default();
for i in it {
let bs = i.map_err(parse_error)?.to_vec();
res.push(String::from_utf8(bs).map_err(|err| {
Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
.set_source(err)
})?);
}
Ok(res)
}
}
fn parse_error(err: sled::Error) -> Error {
Error::new(ErrorKind::Unexpected, "error from sled").set_source(err)
}