核心API Link to heading

上篇已经完成了编解码的抽象。本篇为MMKV功能的完整实现。

回顾一下需求,我们需要一个能存取key-Buffer的组件,内部组合前面设计的MemoryMapEncoderDecoder来实现put,get以及去重和扩容的基本能力。

其结构如下:

pub struct MmkvImpl {
    // 内存缓存,这里直接使用HashMap
    kv_map: HashMap<String, Buffer>,
    is_valid: bool,
    // 一个运行在子线程的消息队列实现,
    // 我们需要将io发送到子线程执行
    io_looper: Option<IOLooper>,
}

// 同时也为其实现了Drop,在析构时将等待io任务完成
impl Drop for MmkvImpl {
    fn drop(&mut self) {
        // IOLooper的drop实现将等待内部队列全部消耗完
        drop(self.io_looper.take());
    }
}

核心API很简单:

// impl MmkvImpl

// 以下为了简便排除了encryption版本的代码,完整代码参考源码仓库
// 参数中的Config封装了一些文件操作相关的细节
pub fn new(config: Config) -> Self {
    let encoder = Box::new(CrcEncoderDecoder);
    let mut kv_map = HashMap::new();
    // 构造mmap,这里config已经为我们打开了对应的文件
    let mm = MemoryMap::new(&config.file, config.file_size() as usize);
    let decoder = Box::new(CrcEncoderDecoder);
    let mut decoded_position = 0;
    // 使用迭代器解码数据,这里闭包内使用了CrcEncoderDecoder实现
    mm.iter(|bytes, position| decoder.decode_bytes(bytes, position))
        .for_each(|buffer: Option<Buffer>| {
            decoded_position += 1;
            if let Some(data) = buffer {
                // 将解码的数据存入kv_map之中
                kv_map.insert(data.key().to_string(), data);
            }
        });
    
    // 这里的IOWriter将在下文详细介绍,其封装了去重和扩容的关键能力
    let io_writer = IOWriter::new(config, mm, decoded_position, encoder, decoder);
    MmkvImpl {
        kv_map,
        is_valid: true,
        // 这里IOLooper将启动一个线程取轮询消息队列的任务
        io_looper: Some(IOLooper::new(io_writer)),
        #[cfg(feature = "encryption")]
        encryptor,
    }
}

pub fn put(&mut self, key: &str, raw_buffer: Buffer) -> Result<()> {
    if !self.is_valid {
        return Err(InstanceClosed);
    }
    // 将key和buffer存入内存中的kv_map
    let result = self.kv_map.insert(key.to_string(), raw_buffer.clone());
    // 如果发现key有重,则告知IOWriter可能需要去重
    let duplicated = result.is_some();
    // 向io线程发送任务,其中的闭包将在io线程执行
    self.io_looper.as_ref().unwrap().post(move |callback| {
        // callback实际是上面构造MmkvImpl时传给IOLooper的IOWriter
        callback
            .downcast_mut::<IOWriter>()
            .unwrap()
            // 调用IOWriter::write将内容写入mmap并sync到文件
            .write(raw_buffer, duplicated)
    })
}

// get的实现非常简单,仅仅是从HashMap取值
pub fn get(&self, key: &str) -> Result<&Buffer> {
    if !self.is_valid {
        return Err(InstanceClosed);
    }
    match self.kv_map.get(key) {
        Some(buffer) => Ok(buffer),
        None => Err(Error::KeyNotFound),
    }
}

单例封装以及多线程安全 Link to heading

我们需要将MmkvImpl包装为单例对外暴露,以方便开发者使用:

// 静态单例,使用RwLock包装以保证多线程能安全访问
static MMKV_INSTANCE: RwLock<Option<MmkvImpl>> = RwLock::new(None);

pub struct MMKV;

impl MMKV {
    // 以下为了简便排除了encryption版本的代码,完整代码参考源码仓库
    pub fn initialize(dir: &str) {
        // 获取写锁
        let mut instance = MMKV_INSTANCE.write().unwrap();
        // 释放旧实例
        drop(instance.take());
        let file_path = MMKV::resolve_file_path(dir);
        // 打开文件,默认大小为一个page_size,同时扩容的标准也是按page_size增加
        let config = Config::new(file_path.as_path(), page_size() as u64);
        // 这里利用RwLock的内部可变性,将新实例存入单例
        *instance = Some(MmkvImpl::new(config));
    }

    pub fn put_str(key: &str, value: &str) -> Result<()> {
        // 先获取写锁
        match MMKV_INSTANCE.write().unwrap().as_mut() {
            // 使用Buffer将数据编码然后存入MmkvImpl的实例之中
            Some(mmkv) => mmkv.put(key, Buffer::from_str(key, value)),
            None => Err(InstanceClosed),
        }
    }

    pub fn get_str(key: &str) -> Result<String> {
        // 先获取读锁
        match MMKV_INSTANCE.read().unwrap().as_ref() {
            // 将取出的Buffer解码为String
            Some(mmkv) => mmkv.get(key)?.decode_str(),
            None => Err(InstanceClosed),
        }
    }
}

如此一来库使用者就可以使用如下形式的代码来调用:

use mmkv::MMKV;

let temp_dir = std::env::temp_dir();
MMKV::initialize(temp_dir.to_str().unwrap());
MMKV::put_str("key1", "1").unwrap();
assert_eq!(MMKV::get_str("key1"), Ok("1".to_string()));

去重和扩容 Link to heading

到现在还有一个关键的坑没有填,那就是在文件读写时的去重和扩容。这部分封装在IOWriter之中,其结构如下:

pub struct IOWriter {
    config: Config,
    mm: MemoryMap,
    position: u32,
    need_trim: bool,
    encoder: Box<dyn Encoder>,
    decoder: Box<dyn Decoder>,
}

// 实现IOLooper的Callback
impl Callback for IOWriter {}

impl IOWriter {
    pub fn write(&mut self, buffer: Buffer, duplicated: bool) {
        // 调用Encoder将Buffer编码为byte数组
        let data = self
            .encoder
            .encode_to_bytes(&buffer, self.position)
            .unwrap();
        let target_end = data.len() + self.mm.offset();
        let max_len = self.mm.len();
        if duplicated {
            // 如果写了重复的key,则标记空间不足时需要去重
            self.need_trim = true;
        }
        if target_end <= max_len {
            // mmap的容量足够,直接写入
            self.mm.append(data).unwrap();
            self.position += 1;
            return;
        }
        // 容量不够,需要去重
        if self.need_trim {
            // 先从mmap中解码出完整的map
            let mut snapshot: HashMap<String, Buffer> = HashMap::new();
            self.mm
                // 这里也是IOWriter需要持有Decoder的原因
                .iter(|bytes, position| self.decoder.decode_bytes(bytes, position))
                .for_each(|buffer| {
                    if let Some(data) = buffer {
                        // mmap里如果有重复的key,后解码的数据将覆盖前面的数据,
                        // 所以解码完成后HashMap将保证没有重复的key
                        snapshot.insert(data.key().to_string(), data);
                    }
                });
            // 由于mmap中只有旧数据,所以我们还要将待写入的buffer也加进去
            // 同时如果这时有重复,HashMap也会保证新数据覆盖掉旧数据
            snapshot.insert(buffer.key().to_string(), buffer);
            // 重置mmap以便从头写入
            self.mm
                .reset()
                .map_err(|e| EncodeFailed(e.to_string()))
                .unwrap();
            self.position = 0;
            // 依次写入完整的map
            for buffer in snapshot.values() {
                let bytes = self.encoder.encode_to_bytes(buffer, self.position).unwrap();
                // 在此过程中如果空间不够也需要先扩容
                if self.mm.offset() + bytes.len() > max_len {
                    self.expand();
                }
                self.mm.append(bytes).unwrap();
                self.position += 1;
            }
            // 重置去重的标记
            self.need_trim = false;
        } else {
            // 如果不需要去重,则扩容然后写入
            self.expand();
            self.mm.append(data).unwrap();
            self.position += 1;
        }
    }

    // 扩容的实现非常简单,将文件长度增加page_size的大小,然后重新map整个文件
    fn expand(&mut self) {
        self.config.expand();
        // 这里self.mm的旧值将会被Drop,释放之前的mmap
        self.mm = MemoryMap::new(&self.config.file, self.config.file_size() as usize);
    }
}

至此MMKV的核心功能已经全部完成,下一篇介绍下一个轻量log的实现。