@@ -518,9 +518,11 @@ pub async fn upgrade(
518
518
519
519
let build = resolve_build ( & ctx, game_id, env_id, body. build , body. build_tags . flatten ( ) ) . await ?;
520
520
521
- let mut sub = ctx
522
- . subscribe :: < ds:: workflows:: server:: UpgradeStarted > ( ( "server_id" , actor_id) )
523
- . await ?;
521
+ // TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
522
+ // upgrading
523
+ // let mut sub = ctx
524
+ // .subscribe::<ds::workflows::server::UpgradeStarted>(("server_id", actor_id))
525
+ // .await?;
524
526
525
527
ctx. signal ( ds:: workflows:: server:: Upgrade {
526
528
image_id : build. build_id ,
@@ -529,7 +531,7 @@ pub async fn upgrade(
529
531
. send ( )
530
532
. await ?;
531
533
532
- sub. next ( ) . await ?;
534
+ // sub.next().await?;
533
535
534
536
Ok ( json ! ( { } ) )
535
537
}
@@ -611,13 +613,15 @@ pub async fn upgrade_all(
611
613
count += list_res. server_ids . len ( ) ;
612
614
cursor = list_res. server_ids . last ( ) . cloned ( ) ;
613
615
614
- let subs = futures_util:: stream:: iter ( list_res. server_ids . clone ( ) )
615
- . map ( |server_id| {
616
- ctx. subscribe :: < ds:: workflows:: server:: UpgradeStarted > ( ( "server_id" , server_id) )
617
- } )
618
- . buffer_unordered ( 32 )
619
- . try_collect :: < Vec < _ > > ( )
620
- . await ?;
616
+ // TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
617
+ // upgrading
618
+ // let subs = futures_util::stream::iter(list_res.server_ids.clone())
619
+ // .map(|server_id| {
620
+ // ctx.subscribe::<ds::workflows::server::UpgradeStarted>(("server_id", server_id))
621
+ // })
622
+ // .buffer_unordered(32)
623
+ // .try_collect::<Vec<_>>()
624
+ // .await?;
621
625
622
626
futures_util:: stream:: iter ( list_res. server_ids )
623
627
. map ( |server_id| {
@@ -631,11 +635,11 @@ pub async fn upgrade_all(
631
635
. try_collect :: < Vec < _ > > ( )
632
636
. await ?;
633
637
634
- futures_util:: stream:: iter ( subs)
635
- . map ( |mut sub| async move { sub. next ( ) . await } )
636
- . buffer_unordered ( 32 )
637
- . try_collect :: < Vec < _ > > ( )
638
- . await ?;
638
+ // futures_util::stream::iter(subs)
639
+ // .map(|mut sub| async move { sub.next().await })
640
+ // .buffer_unordered(32)
641
+ // .try_collect::<Vec<_>>()
642
+ // .await?;
639
643
640
644
if count < 10_000 {
641
645
break ;
0 commit comments