核心API Link to heading
上篇已经完成了编解码的抽象。本篇为MMKV功能的完整实现。
回顾一下需求,我们需要一个能存取key-Buffer的组件,内部组合前面设计的MemoryMap
,Encoder
,Decoder
来实现put,get以及去重和扩容的基本能力。
其结构如下:
pub struct MmkvImpl {
// 内存缓存,这里直接使用HashMap
kv_map: HashMap<String, Buffer>,
is_valid: bool,
// 一个运行在子线程的消息队列实现,
// 我们需要将io发送到子线程执行
io_looper: Option<IOLooper>,
}
// 同时也为其实现了Drop,在析构时将等待io任务完成
impl Drop for MmkvImpl {
fn drop(&mut self) {
// IOLooper的drop实现将等待内部队列全部消耗完
drop(self.io_looper.take());
}
}
核心API很简单:
// impl MmkvImpl
// 以下为了简便排除了encryption版本的代码,完整代码参考源码仓库
// 参数中的Config封装了一些文件操作相关的细节
pub fn new(config: Config) -> Self {
let encoder = Box::new(CrcEncoderDecoder);
let mut kv_map = HashMap::new();
// 构造mmap,这里config已经为我们打开了对应的文件
let mm = MemoryMap::new(&config.file, config.file_size() as usize);
let decoder = Box::new(CrcEncoderDecoder);
let mut decoded_position = 0;
// 使用迭代器解码数据,这里闭包内使用了CrcEncoderDecoder实现
mm.iter(|bytes, position| decoder.decode_bytes(bytes, position))
.for_each(|buffer: Option<Buffer>| {
decoded_position += 1;
if let Some(data) = buffer {
// 将解码的数据存入kv_map之中
kv_map.insert(data.key().to_string(), data);
}
});
// 这里的IOWriter将在下文详细介绍,其封装了去重和扩容的关键能力
let io_writer = IOWriter::new(config, mm, decoded_position, encoder, decoder);
MmkvImpl {
kv_map,
is_valid: true,
// 这里IOLooper将启动一个线程取轮询消息队列的任务
io_looper: Some(IOLooper::new(io_writer)),
#[cfg(feature = "encryption")]
encryptor,
}
}
pub fn put(&mut self, key: &str, raw_buffer: Buffer) -> Result<()> {
if !self.is_valid {
return Err(InstanceClosed);
}
// 将key和buffer存入内存中的kv_map
let result = self.kv_map.insert(key.to_string(), raw_buffer.clone());
// 如果发现key有重,则告知IOWriter可能需要去重
let duplicated = result.is_some();
// 向io线程发送任务,其中的闭包将在io线程执行
self.io_looper.as_ref().unwrap().post(move |callback| {
// callback实际是上面构造MmkvImpl时传给IOLooper的IOWriter
callback
.downcast_mut::<IOWriter>()
.unwrap()
// 调用IOWriter::write将内容写入mmap并sync到文件
.write(raw_buffer, duplicated)
})
}
// get的实现非常简单,仅仅是从HashMap取值
pub fn get(&self, key: &str) -> Result<&Buffer> {
if !self.is_valid {
return Err(InstanceClosed);
}
match self.kv_map.get(key) {
Some(buffer) => Ok(buffer),
None => Err(Error::KeyNotFound),
}
}
单例封装以及多线程安全 Link to heading
我们需要将MmkvImpl包装为单例对外暴露,以方便开发者使用:
// 静态单例,使用RwLock包装以保证多线程能安全访问
static MMKV_INSTANCE: RwLock<Option<MmkvImpl>> = RwLock::new(None);
pub struct MMKV;
impl MMKV {
// 以下为了简便排除了encryption版本的代码,完整代码参考源码仓库
pub fn initialize(dir: &str) {
// 获取写锁
let mut instance = MMKV_INSTANCE.write().unwrap();
// 释放旧实例
drop(instance.take());
let file_path = MMKV::resolve_file_path(dir);
// 打开文件,默认大小为一个page_size,同时扩容的标准也是按page_size增加
let config = Config::new(file_path.as_path(), page_size() as u64);
// 这里利用RwLock的内部可变性,将新实例存入单例
*instance = Some(MmkvImpl::new(config));
}
pub fn put_str(key: &str, value: &str) -> Result<()> {
// 先获取写锁
match MMKV_INSTANCE.write().unwrap().as_mut() {
// 使用Buffer将数据编码然后存入MmkvImpl的实例之中
Some(mmkv) => mmkv.put(key, Buffer::from_str(key, value)),
None => Err(InstanceClosed),
}
}
pub fn get_str(key: &str) -> Result<String> {
// 先获取读锁
match MMKV_INSTANCE.read().unwrap().as_ref() {
// 将取出的Buffer解码为String
Some(mmkv) => mmkv.get(key)?.decode_str(),
None => Err(InstanceClosed),
}
}
}
如此一来库使用者就可以使用如下形式的代码来调用:
use mmkv::MMKV;
let temp_dir = std::env::temp_dir();
MMKV::initialize(temp_dir.to_str().unwrap());
MMKV::put_str("key1", "1").unwrap();
assert_eq!(MMKV::get_str("key1"), Ok("1".to_string()));
去重和扩容 Link to heading
到现在还有一个关键的坑没有填,那就是在文件读写时的去重和扩容。这部分封装在IOWriter
之中,其结构如下:
pub struct IOWriter {
config: Config,
mm: MemoryMap,
position: u32,
need_trim: bool,
encoder: Box<dyn Encoder>,
decoder: Box<dyn Decoder>,
}
// 实现IOLooper的Callback
impl Callback for IOWriter {}
impl IOWriter {
pub fn write(&mut self, buffer: Buffer, duplicated: bool) {
// 调用Encoder将Buffer编码为byte数组
let data = self
.encoder
.encode_to_bytes(&buffer, self.position)
.unwrap();
let target_end = data.len() + self.mm.offset();
let max_len = self.mm.len();
if duplicated {
// 如果写了重复的key,则标记空间不足时需要去重
self.need_trim = true;
}
if target_end <= max_len {
// mmap的容量足够,直接写入
self.mm.append(data).unwrap();
self.position += 1;
return;
}
// 容量不够,需要去重
if self.need_trim {
// 先从mmap中解码出完整的map
let mut snapshot: HashMap<String, Buffer> = HashMap::new();
self.mm
// 这里也是IOWriter需要持有Decoder的原因
.iter(|bytes, position| self.decoder.decode_bytes(bytes, position))
.for_each(|buffer| {
if let Some(data) = buffer {
// mmap里如果有重复的key,后解码的数据将覆盖前面的数据,
// 所以解码完成后HashMap将保证没有重复的key
snapshot.insert(data.key().to_string(), data);
}
});
// 由于mmap中只有旧数据,所以我们还要将待写入的buffer也加进去
// 同时如果这时有重复,HashMap也会保证新数据覆盖掉旧数据
snapshot.insert(buffer.key().to_string(), buffer);
// 重置mmap以便从头写入
self.mm
.reset()
.map_err(|e| EncodeFailed(e.to_string()))
.unwrap();
self.position = 0;
// 依次写入完整的map
for buffer in snapshot.values() {
let bytes = self.encoder.encode_to_bytes(buffer, self.position).unwrap();
// 在此过程中如果空间不够也需要先扩容
if self.mm.offset() + bytes.len() > max_len {
self.expand();
}
self.mm.append(bytes).unwrap();
self.position += 1;
}
// 重置去重的标记
self.need_trim = false;
} else {
// 如果不需要去重,则扩容然后写入
self.expand();
self.mm.append(data).unwrap();
self.position += 1;
}
}
// 扩容的实现非常简单,将文件长度增加page_size的大小,然后重新map整个文件
fn expand(&mut self) {
self.config.expand();
// 这里self.mm的旧值将会被Drop,释放之前的mmap
self.mm = MemoryMap::new(&self.config.file, self.config.file_size() as usize);
}
}
至此MMKV的核心功能已经全部完成,下一篇介绍下一个轻量log的实现。