Skip to content

Commit 454f741

Browse files
MasterPtatoNathanFlurry
authored andcommitted
fix: move actor state to fdb
1 parent 1a1ed3f commit 454f741

File tree

36 files changed

+673
-362
lines changed

36 files changed

+673
-362
lines changed

Cargo.toml

+1-4
Large diffs are not rendered by default.

packages/common/chirp-workflow/core/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ rivet-runtime.workspace = true
3434
rivet-util.workspace = true
3535
serde = { version = "1.0.198", features = ["derive"] }
3636
serde_json = "1.0.116"
37-
sqlite-util.workspace = true
3837
strum = { version = "0.26", features = ["derive"] }
3938
thiserror = "1.0.59"
4039
tokio = { version = "1.40.0", features = ["full"] }

packages/common/chirp-workflow/core/src/ctx/activity.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ impl ActivityCtx {
239239
}
240240

241241
pub async fn sqlite_for_workflow(&self, workflow_id: Uuid) -> GlobalResult<SqlitePool> {
242-
common::sqlite_for_workflow(&self.db, &self.conn, workflow_id, false).await
242+
common::sqlite_for_workflow(&self.db, &self.conn, workflow_id, true).await
243243
}
244244

245245
// Backwards compatibility

packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs

+33-19
Original file line numberDiff line numberDiff line change
@@ -380,24 +380,23 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
380380
.unpack::<JustUuid>(entry.key())
381381
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;
382382

383-
if current_workflow_id
384-
.map(|x| workflow_id != x)
385-
.unwrap_or_default()
386-
{
387-
// Save if matches query
388-
if matching_tags == tags.len() && name_matches && state_matches {
389-
workflow_ids.push(workflow_id);
390-
391-
if workflow_ids.len() >= 100 {
392-
current_workflow_id = None;
393-
break;
383+
if let Some(curr) = current_workflow_id {
384+
if workflow_id != curr {
385+
// Save if matches query
386+
if matching_tags == tags.len() && name_matches && state_matches {
387+
workflow_ids.push(curr);
388+
389+
if workflow_ids.len() >= 100 {
390+
current_workflow_id = None;
391+
break;
392+
}
394393
}
394+
395+
// Reset state
396+
matching_tags = 0;
397+
name_matches = name.is_none();
398+
state_matches = state.is_none() || state == Some(WorkflowState::Dead);
395399
}
396-
397-
// Reset state
398-
matching_tags = 0;
399-
name_matches = name.is_none();
400-
state_matches = state.is_none() || state == Some(WorkflowState::Dead);
401400
}
402401

403402
current_workflow_id = Some(workflow_id);
@@ -499,13 +498,24 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
499498
.serialize(())
500499
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
501500
);
501+
502+
let has_wake_condition_key = keys::workflow::HasWakeConditionKey::new(workflow_id);
503+
tx.set(
504+
&self.subspace.pack(&has_wake_condition_key),
505+
&has_wake_condition_key
506+
.serialize(())
507+
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
508+
);
502509
}
503510

504511
Ok(())
505512
}
506513
})
507-
.await
508-
.map_err(Into::into)
514+
.await?;
515+
516+
self.wake_worker();
517+
518+
Ok(())
509519
}
510520

511521
async fn get_workflow_history(
@@ -721,7 +731,11 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
721731
sql_fetch_all!(
722732
[SqlStub {}, ActivityErrorRow, pool]
723733
"
724-
SELECT json(location) AS location, error, COUNT(error), MAX(ts) AS latest_ts
734+
SELECT
735+
json(location) AS location,
736+
error,
737+
COUNT(error) AS count,
738+
MAX(ts) AS latest_ts
725739
FROM workflow_activity_errors
726740
GROUP BY location, error
727741
ORDER BY latest_ts

packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::collections::HashMap;
33
use include_dir::{include_dir, Dir, File};
44
use indoc::indoc;
55
use rivet_pools::prelude::*;
6-
use sqlite_util::SqlitePoolExt;
76
use sqlx::Acquire;
87
use uuid::Uuid;
98

packages/common/hub-embed/build.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
2727
println!("stderr:\n{}", String::from_utf8_lossy(&output.stderr));
2828
assert!(output.status.success(), "yarn install failed");
2929

30-
// println!("Running yarn build");
31-
// let output = Command::new("yarn")
32-
// .current_dir(&hub_path)
33-
// .args(["dlx", "turbo", "run", "build:embedded"])
34-
// .env("VITE_APP_API_URL", "__APP_API_URL__")
35-
// .output()?;
36-
// println!("stdout:\n{}", String::from_utf8_lossy(&output.stdout));
37-
// println!("stderr:\n{}", String::from_utf8_lossy(&output.stderr));
38-
// assert!(output.status.success(), "hub build failed");
30+
if std::env::var("RIVET_SKIP_BUILD_HUB").is_err() {
31+
println!("Running yarn build");
32+
let output = Command::new("yarn")
33+
.current_dir(&hub_path)
34+
.args(["dlx", "turbo", "run", "build:embedded"])
35+
.env("VITE_APP_API_URL", "__APP_API_URL__")
36+
.output()?;
37+
println!("stdout:\n{}", String::from_utf8_lossy(&output.stdout));
38+
println!("stderr:\n{}", String::from_utf8_lossy(&output.stderr));
39+
assert!(output.status.success(), "hub build failed");
40+
}
3941

4042
// Copy dist directory to out_dir
4143
let dist_path = hub_path.join("dist");

packages/common/pools/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ rand = "0.8"
2424
rivet-config.workspace = true
2525
rivet-metrics.workspace = true
2626
service-discovery.workspace = true
27-
sqlite-util.workspace = true
2827
tempfile = "3.13.0"
2928
thiserror = "1.0"
3029
tokio = { version = "1.40", features = ["tracing"] }

packages/common/pools/src/db/sqlite/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use dirs;
2-
use fdb_util::prelude::*;
2+
use fdb_util::{SERIALIZABLE, prelude::*};
33
use foundationdb::{self as fdb, options::StreamingMode, tuple::Subspace, FdbBindingError};
44
use uuid::Uuid;
55

@@ -268,7 +268,7 @@ impl SqlitePoolManager {
268268
mode: StreamingMode::WantAll,
269269
..(&db_data_subspace).into()
270270
},
271-
false,
271+
SERIALIZABLE,
272272
);
273273

274274
// Aggregate data

packages/common/server-cli/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@ clap = { version = "4.3", features = ["derive"] }
1313
colored_json = "5.0.0"
1414
futures-util = "0.3"
1515
global-error.workspace = true
16-
hex = "0.4"
16+
hex.workspace = true
1717
include_dir = "0.7.4"
1818
indoc = "2.0.5"
1919
reqwest = "0.12.9"
2020
foundationdb.workspace = true
2121
rivet-api.workspace = true
2222
rivet-config.workspace = true
23+
rivet-logs.workspace = true
2324
rivet-migrate.workspace = true
2425
rivet-pools.workspace = true
2526
rivet-runtime.workspace = true

packages/common/server-cli/src/commands/fdb/cli.rs

+24-9
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,7 @@ use crate::util::{
1414
format::indent_string,
1515
};
1616

17-
// #[derive(Parser)]
18-
// #[command(name = "")]
19-
// struct Command {
20-
// #[command(subcommand)]
21-
// command: Commands,
22-
// }
23-
17+
// TODO: Tab completion
2418
#[derive(Parser)]
2519
#[command(name = "")]
2620
pub enum SubCommand {
@@ -72,6 +66,9 @@ pub enum SubCommand {
7266
/// In what manner to clear the current key. Range clears the entire subspace.
7367
#[arg(value_enum)]
7468
clear_type: Option<ClearType>,
69+
/// Disable confirmation prompt.
70+
#[arg(short = 'y', long, default_value_t = false)]
71+
yes: bool,
7572
},
7673

7774
#[command(name = "exit")]
@@ -95,7 +92,10 @@ impl SubCommand {
9592
Err(err) => println!("{err:#}"),
9693
},
9794
// TODO: chunks
98-
SubCommand::Get { type_hint, chunks: _chunks } => {
95+
SubCommand::Get {
96+
type_hint,
97+
chunks: _chunks,
98+
} => {
9999
let fut = pool.run(|tx, _mc| {
100100
let current_tuple = current_tuple.clone();
101101
async move {
@@ -206,6 +206,7 @@ impl SubCommand {
206206
);
207207

208208
last_key = curr.clone();
209+
current_hidden_subspace = None;
209210
hidden_count = 0;
210211
}
211212

@@ -323,7 +324,21 @@ impl SubCommand {
323324
Err(_) => println!("txn timed out"),
324325
}
325326
}
326-
SubCommand::Clear { clear_type } => {
327+
SubCommand::Clear { clear_type, yes } => {
328+
if !yes {
329+
let term = rivet_term::terminal();
330+
let response = rivet_term::prompt::PromptBuilder::default()
331+
.message("Are you sure?")
332+
.build()
333+
.expect("failed to build prompt")
334+
.bool(&term)
335+
.await
336+
.expect("failed to show prompt");
337+
if !response {
338+
return CommandResult::Error;
339+
}
340+
}
341+
327342
let fut = pool.run(|tx, _mc| {
328343
let current_tuple = current_tuple.clone();
329344
async move {

packages/common/server-cli/src/commands/fdb/mod.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::result::Result::{Err, Ok};
1+
use std::{
2+
path::Path,
3+
result::Result::{Err, Ok},
4+
};
25

36
use anyhow::*;
47
use clap::Parser;
@@ -29,7 +32,10 @@ impl Opts {
2932
run_commands(&pool, &mut current_tuple, query).await;
3033
} else {
3134
let mut rl = DefaultEditor::new()?;
32-
rl.load_history("/tmp/history.txt")?;
35+
let history_location = Path::new("/tmp/rivet-server-fdb-history");
36+
if history_location.exists() {
37+
rl.load_history(&history_location)?;
38+
}
3339

3440
println!("FDB Viewer\n");
3541

@@ -53,7 +59,7 @@ impl Opts {
5359
}
5460
}
5561

56-
rl.save_history("/tmp/history.txt")?;
62+
rl.save_history(&history_location)?;
5763
}
5864

5965
Ok(())

packages/common/server-cli/src/commands/start.rs

+20
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
use std::{path::Path, time::Duration};
2+
13
use anyhow::*;
24
use clap::Parser;
35
use rivet_service_manager::{CronConfig, RunConfig};
46

7+
// 7 day logs retention
8+
const LOGS_RETENTION: Duration = Duration::from_secs(7 * 24 * 60 * 60);
9+
510
#[derive(Parser)]
611
pub struct Opts {
712
#[arg(long)]
@@ -42,6 +47,21 @@ impl Opts {
4247
config: rivet_config::Config,
4348
run_config: &RunConfig,
4449
) -> Result<()> {
50+
// Redirect logs if enabled on the edge
51+
if config
52+
.server()
53+
.ok()
54+
.and_then(|x| x.rivet.edge.as_ref())
55+
.and_then(|x| x.redirect_logs)
56+
.unwrap_or_default()
57+
{
58+
let logs_path = Path::new("/var/log/rivet-edge-server");
59+
std::fs::create_dir_all(logs_path)?;
60+
rivet_logs::Logs::new(logs_path.to_path_buf(), LOGS_RETENTION)
61+
.start()
62+
.await?;
63+
}
64+
4565
// Provision services before starting server
4666
if !self.skip_provision {
4767
s3_util::provision(config.clone(), &run_config.s3_buckets).await?;

packages/common/server-cli/src/util/fdb.rs

+45-6
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ pub enum SimpleTupleValue {
2626

2727
impl SimpleTupleValue {
2828
fn parse(value: &str) -> Self {
29-
if let Ok(v) = value.parse::<u64>() {
30-
SimpleTupleValue::U64(v)
31-
} else if let Ok(v) = value.parse::<i64>() {
29+
if let Ok(v) = value.parse::<i64>() {
3230
SimpleTupleValue::I64(v)
31+
} else if let Ok(v) = value.parse::<u64>() {
32+
SimpleTupleValue::U64(v)
3333
} else if let Ok(v) = value.parse::<f64>() {
3434
SimpleTupleValue::F64(v)
3535
} else if let Ok(v) = Uuid::from_str(value) {
@@ -54,7 +54,24 @@ impl fmt::Display for SimpleTupleValue {
5454
write!(f, "{}", style(v).green())
5555
}
5656
}
57-
SimpleTupleValue::Bytes(v) => write!(f, "{:?}", style(v).italic()),
57+
SimpleTupleValue::Bytes(v) => {
58+
let hex_string = if v.len() > 512 { &v[..512] } else { v }
59+
.iter()
60+
.map(|byte| format!("{:02x}", byte))
61+
.collect::<String>();
62+
write!(f, "{}", style(hex_string).italic())?;
63+
64+
if v.len() > 512 {
65+
write!(
66+
f,
67+
"{} {}",
68+
style("...").italic(),
69+
style(format!("({} bytes)", v.len())).dim()
70+
)?;
71+
}
72+
73+
Ok(())
74+
}
5875
}
5976
}
6077
}
@@ -218,7 +235,7 @@ impl SimpleValue {
218235
Some("str") => SimpleValue::String(value.to_string()),
219236
Some("bytes") | Some("b") => {
220237
let bytes = hex::decode(value.as_bytes())
221-
.with_context(|| format!("Could not parse `{value:?}` as hex encoded bytes"))?;
238+
.with_context(|| format!("Could not parse `{value}` as hex encoded bytes"))?;
222239
SimpleValue::Bytes(bytes)
223240
}
224241
Some(type_hint) => bail!("unknown type: `{type_hint}`"),
@@ -258,7 +275,24 @@ impl fmt::Display for SimpleValue {
258275
}
259276
}
260277
SimpleValue::String(v) => write!(f, "{}", style(v).green()),
261-
SimpleValue::Bytes(v) => write!(f, "{:?}", style(v).italic()),
278+
SimpleValue::Bytes(v) => {
279+
let hex_string = if v.len() > 512 { &v[..512] } else { v }
280+
.iter()
281+
.map(|byte| format!("{:02x}", byte))
282+
.collect::<String>();
283+
write!(f, "{}", style(hex_string).italic())?;
284+
285+
if v.len() > 512 {
286+
write!(
287+
f,
288+
"{} {}",
289+
style("...").italic(),
290+
style(format!("({} bytes)", v.len())).dim()
291+
)?;
292+
}
293+
294+
Ok(())
295+
}
262296
}
263297
}
264298
}
@@ -299,6 +333,11 @@ impl SimpleTupleSegment {
299333
Some("uuid") => Uuid::from_str(value)
300334
.map(SimpleTupleValue::Uuid)
301335
.with_context(|| format!("Could not parse `{value}` as UUID"))?,
336+
Some("bytes") | Some("b") => {
337+
let bytes = hex::decode(value.as_bytes())
338+
.with_context(|| format!("Could not parse `{value}` as hex encoded bytes"))?;
339+
SimpleTupleValue::Bytes(bytes)
340+
}
302341
Some("str") => SimpleTupleValue::String(value.to_string()),
303342
Some(prefix) => bail!("unknown type: `{prefix}`"),
304343
_ => SimpleTupleValue::parse(value),

0 commit comments

Comments
 (0)