Skip to content

Commit 0513f5c

Browse files
committed
FIX: handle none workerstate in StealResponse
1 parent 2927b96 commit 0513f5c

File tree

4 files changed

+27
-8
lines changed

4 files changed

+27
-8
lines changed

src/protocol/protocol.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -575,10 +575,7 @@ mod tests {
575575
task_spec_to_memory, ClientTaskSpec, DirectTaskSpec, FromClientMessage, KeyInMemoryMsg,
576576
ToClientMessage, UpdateGraphMsg,
577577
};
578-
use crate::protocol::protocol::{
579-
asyncwrite_to_sink, serialize_single_packet, split_packet_into_parts, Batch, DaskCodec,
580-
DaskPacket, DaskPacketPart, Frame, MessageWrapper, SerializedMemory,
581-
};
578+
use crate::protocol::protocol::{asyncwrite_to_sink, serialize_single_packet, split_packet_into_parts, Batch, DaskCodec, DaskPacket, DaskPacketPart, Frame, MessageWrapper, SerializedMemory, SerializedTransport};
582579
use crate::Result;
583580
use bytes::{Buf, BufMut, BytesMut};
584581
use futures::SinkExt;
@@ -587,7 +584,7 @@ mod tests {
587584
use crate::common::Map;
588585
use crate::protocol::key::{to_dask_key, DaskKey};
589586
use crate::protocol::protocol::IntoInner;
590-
use crate::protocol::workermsg::RegisterWorkerResponseMsg;
587+
use crate::protocol::workermsg::{RegisterWorkerResponseMsg, FromWorkerMessage};
591588
use crate::test_util::{bytes_to_msg, load_bin_test_data};
592589
use std::collections::hash_map::DefaultHasher;
593590
use std::hash::Hasher;
@@ -890,6 +887,27 @@ mod tests {
890887
Ok(())
891888
}
892889

890+
#[tokio::test]
891+
async fn parse_steal_response_state_none() -> Result<()> {
892+
let main = load_bin_test_data("data/steal-response-state-none.bin");
893+
let msg: MessageWrapper<FromWorkerMessage<SerializedTransport>> =
894+
rmp_serde::from_slice(main.as_slice())?;
895+
match msg {
896+
MessageWrapper::MessageList(v) => {
897+
assert_eq!(v.len(), 1);
898+
match &v[0] {
899+
FromWorkerMessage::StealResponse(msg) => {
900+
assert!(msg.state.is_none());
901+
}
902+
_ => panic!()
903+
}
904+
}
905+
_ => panic!(),
906+
}
907+
908+
Ok(())
909+
}
910+
893911
#[tokio::test]
894912
async fn serialize_key_in_memory() -> Result<()> {
895913
let msg = ToClientMessage::KeyInMemory(KeyInMemoryMsg {

src/protocol/workermsg.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,13 @@ pub enum WorkerState {
188188
Error,
189189
Rescheduled,
190190
Constrained,
191-
LongRunning,
191+
LongRunning
192192
}
193193

194194
#[derive(Serialize, Deserialize, Debug)]
195195
pub struct StealResponseMsg {
196196
pub key: DaskKey,
197-
pub state: WorkerState,
197+
pub state: Option<WorkerState>,
198198
}
199199

200200
#[derive(Serialize, Deserialize, Debug)]

src/server/core.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ impl Core {
345345
};
346346

347347
// This needs to correspond with behavior in worker!
348-
let success = match msg.state {
348+
let success = match msg.state.unwrap_or(WorkerState::Error) {
349349
WorkerState::Waiting | WorkerState::Ready => true,
350350
_ => false,
351351
};
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
���op�steal-response�key�<('sum-partial-4372a19b33e6131e2a9e5de4ef35499c', 0, 0, 0, 4)�state�

0 commit comments

Comments
 (0)