Skip to content

Commit 98f8d06

Browse files
committed
libsql: WAL sync baton handling
1 parent a05ad5d commit 98f8d06

1 file changed

Lines changed: 33 additions & 12 deletions

File tree

libsql/src/sync.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub enum SyncError {
4444
JsonEncode(serde_json::Error),
4545
#[error("failed to push frame: status={0}, error={1}")]
4646
PushFrame(StatusCode, String),
47+
#[error("no baton from WAL push operation")]
48+
NoBatonFromPush,
4749
#[error("failed to verify metadata file version: expected={0}, got={1}")]
4850
VerifyVersion(u32, u32),
4951
#[error("failed to verify metadata file hash: expected={0}, got={1}")]
@@ -75,7 +77,7 @@ pub struct PushResult {
7577
}
7678

7779
pub enum PushStatus {
78-
Ok,
80+
Ok { baton: String },
7981
Conflict,
8082
}
8183

@@ -161,28 +163,33 @@ impl SyncContext {
161163
#[tracing::instrument(skip(self, frames))]
162164
pub(crate) async fn push_frames(
163165
&mut self,
166+
baton: Option<String>,
164167
frames: Bytes,
165168
generation: u32,
166169
frame_no: u32,
167170
frames_count: u32,
168-
) -> Result<u32> {
169-
let uri = format!(
171+
) -> Result<(Option<String>, u32)> {
172+
let mut uri = format!(
170173
"{}/sync/{}/{}/{}",
171174
self.sync_url,
172175
generation,
173176
frame_no,
174177
frame_no + frames_count
175178
);
176-
tracing::debug!("pushing frame");
179+
if let Some(baton) = baton {
180+
uri += &format!("/{}", baton);
181+
}
177182

178-
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
183+
tracing::debug!("pushing frame with uri: {}", uri);
179184

180-
match result.status {
185+
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
186+
187+
let baton = match result.status {
181188
PushStatus::Conflict => {
182189
return Err(SyncError::InvalidPushFrameConflict(frame_no, result.max_frame_no).into());
183190
}
184-
_ => {}
185-
}
191+
PushStatus::Ok { baton } => baton,
192+
};
186193
let generation = result.generation;
187194
let durable_frame_num = result.max_frame_no;
188195

@@ -217,7 +224,7 @@ impl SyncContext {
217224
self.durable_generation = generation;
218225
self.durable_frame_num = durable_frame_num;
219226

220-
Ok(durable_frame_num)
227+
Ok((Some(baton), durable_frame_num))
221228
}
222229

223230
async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<PushResult> {
@@ -250,6 +257,11 @@ impl SyncContext {
250257
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
251258
.map_err(SyncError::JsonDecode)?;
252259

260+
let baton: Option<String> = resp
261+
.get("baton")
262+
.map(|v| v.as_str().map(String::from))
263+
.flatten();
264+
253265
let status = resp
254266
.get("status")
255267
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
@@ -275,7 +287,13 @@ impl SyncContext {
275287
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;
276288

277289
let status = match status {
278-
"ok" => PushStatus::Ok,
290+
"ok" => {
291+
if let Some(baton) = baton {
292+
PushStatus::Ok { baton }
293+
} else {
294+
return Err(SyncError::NoBatonFromPush.into());
295+
}
296+
},
279297
"conflict" => PushStatus::Conflict,
280298
_ => return Err(SyncError::JsonValue(resp.clone()).into()),
281299
};
@@ -601,6 +619,7 @@ async fn try_push(
601619
});
602620
}
603621

622+
let mut baton: Option<String> = None;
604623
let generation = sync_ctx.durable_generation();
605624
let start_frame_no = sync_ctx.durable_frame_num() + 1;
606625
let end_frame_no = max_frame_no;
@@ -620,10 +639,12 @@ async fn try_push(
620639
// The server returns its maximum frame number. To avoid resending
621640
// frames the server already knows about, we need to update the
622641
// frame number to the one returned by the server.
623-
let max_frame_no = sync_ctx
624-
.push_frames(frames.freeze(), generation, frame_no, batch_size)
642+
let (new_baton, max_frame_no) = sync_ctx
643+
.push_frames(baton.clone(), frames.freeze(), generation, frame_no, batch_size)
625644
.await?;
626645

646+
baton = new_baton;
647+
627648
if max_frame_no > frame_no {
628649
frame_no = max_frame_no + 1;
629650
} else {

0 commit comments

Comments
 (0)