bdk_file_store/
entry_iter.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use bincode::Options;
use std::{
    fs::File,
    io::{self, BufReader, Seek},
    marker::PhantomData,
};

use crate::bincode_options;

/// Iterator over entries in a file store.
///
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
///
/// [`next`]: Self::next
pub struct EntryIter<'t, T> {
    /// Buffered reader around the file
    db_file: BufReader<&'t mut File>,
    finished: bool,
    /// The file position for the first read of `db_file`.
    start_pos: Option<u64>,
    types: PhantomData<T>,
}

impl<'t, T> EntryIter<'t, T> {
    pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
        Self {
            db_file: BufReader::new(db_file),
            start_pos: Some(start_pos),
            finished: false,
            types: PhantomData,
        }
    }
}

impl<T> Iterator for EntryIter<'_, T>
where
    T: serde::de::DeserializeOwned,
{
    type Item = Result<T, IterError>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.finished {
            return None;
        }
        (|| {
            if let Some(start) = self.start_pos.take() {
                self.db_file.seek(io::SeekFrom::Start(start))?;
            }

            let pos_before_read = self.db_file.stream_position()?;
            match bincode_options().deserialize_from(&mut self.db_file) {
                Ok(changeset) => Ok(Some(changeset)),
                Err(e) => {
                    self.finished = true;
                    let pos_after_read = self.db_file.stream_position()?;
                    // allow unexpected EOF if 0 bytes were read
                    if let bincode::ErrorKind::Io(inner) = &*e {
                        if inner.kind() == io::ErrorKind::UnexpectedEof
                            && pos_after_read == pos_before_read
                        {
                            return Ok(None);
                        }
                    }
                    self.db_file.seek(io::SeekFrom::Start(pos_before_read))?;
                    Err(IterError::Bincode(*e))
                }
            }
        })()
        .transpose()
    }
}

impl<T> Drop for EntryIter<'_, T> {
    fn drop(&mut self) {
        // This syncs the underlying file's offset with the buffer's position. This way, we
        // maintain the correct position to start the next read/write.
        if let Ok(pos) = self.db_file.stream_position() {
            let _ = self.db_file.get_mut().seek(io::SeekFrom::Start(pos));
        }
    }
}

/// Error type for [`EntryIter`].
#[derive(Debug)]
pub enum IterError {
    /// Failure to read from the file.
    Io(io::Error),
    /// Failure to decode data from the file.
    Bincode(bincode::ErrorKind),
}

impl core::fmt::Display for IterError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
            IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
        }
    }
}

impl From<io::Error> for IterError {
    fn from(value: io::Error) -> Self {
        IterError::Io(value)
    }
}

impl std::error::Error for IterError {}