| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::collections::HashMap; |
| use std::fmt::Debug; |
| use std::fmt::Formatter; |
| |
| use async_trait::async_trait; |
| |
| use crate::raw::adapters::kv; |
| use crate::Builder; |
| use crate::Error; |
| use crate::ErrorKind; |
| use crate::Scheme; |
| use crate::*; |
| |
| /// Sled service support. |
| /// |
| #[doc = include_str!("docs.md")] |
| #[derive(Default)] |
| pub struct SledBuilder { |
| /// That path to the sled data directory. |
| datadir: Option<String>, |
| root: Option<String>, |
| } |
| |
| impl SledBuilder { |
| /// Set the path to the sled data directory. Will create if not exists. |
| pub fn datadir(&mut self, path: &str) -> &mut Self { |
| self.datadir = Some(path.into()); |
| self |
| } |
| |
| /// Set the root for sled. |
| pub fn root(&mut self, path: &str) -> &mut Self { |
| self.root = Some(path.into()); |
| self |
| } |
| } |
| |
| impl Builder for SledBuilder { |
| const SCHEME: Scheme = Scheme::Sled; |
| type Accessor = SledBackend; |
| |
| fn from_map(map: HashMap<String, String>) -> Self { |
| let mut builder = SledBuilder::default(); |
| |
| map.get("datadir").map(|v| builder.datadir(v)); |
| map.get("root").map(|v| builder.root(v)); |
| |
| builder |
| } |
| |
| fn build(&mut self) -> Result<Self::Accessor> { |
| let datadir_path = self.datadir.take().ok_or_else(|| { |
| Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set") |
| .with_context("service", Scheme::Sled) |
| })?; |
| |
| let db = sled::open(&datadir_path).map_err(|e| { |
| Error::new(ErrorKind::ConfigInvalid, "open db") |
| .with_context("service", Scheme::Sled) |
| .with_context("datadir", datadir_path.clone()) |
| .set_source(e) |
| })?; |
| |
| Ok(SledBackend::new(Adapter { |
| datadir: datadir_path, |
| db, |
| }) |
| .with_root(self.root.as_deref().unwrap_or_default())) |
| } |
| } |
| |
| /// Backend for sled services. |
| pub type SledBackend = kv::Backend<Adapter>; |
| |
| #[derive(Clone)] |
| pub struct Adapter { |
| datadir: String, |
| db: sled::Db, |
| } |
| |
| impl Debug for Adapter { |
| fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { |
| let mut ds = f.debug_struct("Adapter"); |
| ds.field("path", &self.datadir); |
| ds.finish() |
| } |
| } |
| |
| #[async_trait] |
| impl kv::Adapter for Adapter { |
| fn metadata(&self) -> kv::Metadata { |
| kv::Metadata::new( |
| Scheme::Sled, |
| &self.datadir, |
| Capability { |
| read: true, |
| write: true, |
| list: true, |
| blocking: true, |
| ..Default::default() |
| }, |
| ) |
| } |
| |
| async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> { |
| self.blocking_get(path) |
| } |
| |
| fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> { |
| Ok(self.db.get(path).map_err(parse_error)?.map(|v| v.to_vec())) |
| } |
| |
| async fn set(&self, path: &str, value: &[u8]) -> Result<()> { |
| self.blocking_set(path, value) |
| } |
| |
| fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> { |
| self.db.insert(path, value).map_err(parse_error)?; |
| |
| Ok(()) |
| } |
| |
| async fn delete(&self, path: &str) -> Result<()> { |
| self.blocking_delete(path) |
| } |
| |
| fn blocking_delete(&self, path: &str) -> Result<()> { |
| self.db.remove(path).map_err(parse_error)?; |
| |
| Ok(()) |
| } |
| |
| async fn scan(&self, path: &str) -> Result<Vec<String>> { |
| self.blocking_scan(path) |
| } |
| |
| fn blocking_scan(&self, path: &str) -> Result<Vec<String>> { |
| let it = self.db.scan_prefix(path).keys(); |
| let mut res = Vec::default(); |
| |
| for i in it { |
| let bs = i.map_err(parse_error)?.to_vec(); |
| |
| res.push(String::from_utf8(bs).map_err(|err| { |
| Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string") |
| .set_source(err) |
| })?); |
| } |
| |
| Ok(res) |
| } |
| } |
| |
| fn parse_error(err: sled::Error) -> Error { |
| Error::new(ErrorKind::Unexpected, "error from sled").set_source(err) |
| } |