@@ -68,19 +68,28 @@ def output_reader(proc, outq):
68
68
for line in iter (proc .stdout .readline , b'' ):
69
69
outq .put (line .decode ('utf-8' ))
70
70
71
+
72
+ barrier = threading .Barrier (2 , timeout = 30 )
73
+ num_stop_signals = 0
74
+ run = True
75
+
76
+
71
77
def communication (monitor_proc , pid ):
72
78
"""A"""
79
+ global num_stop_signals
80
+ global run
81
+ global barrier
73
82
outq = queue .Queue ()
74
83
t = threading .Thread (target = output_reader , args = (monitor_proc ,outq ))
75
84
t .start ()
76
85
77
- run = True
78
86
try :
79
87
time .sleep (0.2 )
80
-
88
+
81
89
while run :
82
90
try :
83
91
line = outq .get (block = False ).rstrip ()
92
+ print (line )
84
93
85
94
sys .stdout .flush ()
86
95
@@ -92,26 +101,39 @@ def communication(monitor_proc, pid):
92
101
print ("___" + pid + "___Creating subscriber___" )
93
102
sys .stdout .flush ()
94
103
subscriber1_proc = subprocess .Popen ([subscriber_command , "--seed" , pid ]
95
- + (["--xmlfile" , real_xml_file_sub ] if real_xml_file_sub else []))
96
-
104
+ + (["--xmlfile" , real_xml_file_sub ] if real_xml_file_sub else []),
105
+ stdin = subprocess .PIPE , stdout = subprocess .PIPE )
106
+
97
107
print ("___" + pid + "___Creating publisher___" )
98
108
sys .stdout .flush ()
99
109
publisher_proc1 = subprocess .Popen ([publisher_command , "--seed" , pid , "--wait" , "1" ]
100
110
+ (["--xmlfile" , real_xml_file_pub ] if real_xml_file_pub else [])
101
111
+ extra_pub_args )
102
112
113
+ line = ""
114
+ while 'Subscriber finished receiving samples' != line :
115
+ line = subscriber1_proc .stdout .readline ().decode ('utf-8' ).rstrip ()
116
+ print (line )
117
+ sys .stdout .flush ()
118
+
119
+ print ("___" + pid + "___barrier...___" )
120
+ sys .stdout .flush ()
121
+ barrier .wait ()
122
+
103
123
print ("___" + pid + "___subscriber1 communicate...___" )
104
124
sys .stdout .flush ()
105
- publisher_proc1 .communicate ()
125
+ subscriber1_proc .communicate ('a' . encode ( 'utf-8' ) )
106
126
107
127
print ("___" + pid + "___publisher1 communicate...___" )
108
128
sys .stdout .flush ()
109
- subscriber1_proc .communicate ()
129
+ publisher_proc1 .communicate ()
110
130
111
131
elif (line == ("Stop Monitor_" + pid )):
112
132
print ("___" + pid + "___Stop Monitor___" )
113
133
sys .stdout .flush ()
114
- run = False
134
+ num_stop_signals += 1
135
+ if 2 == num_stop_signals :
136
+ run = False
115
137
116
138
else :
117
139
print ("___" + pid + '_ ' + line )
@@ -123,7 +145,7 @@ def communication(monitor_proc, pid):
123
145
124
146
time .sleep (0.1 )
125
147
finally :
126
- monitor_proc .terminate ( )
148
+ monitor_proc .communicate ( 'a' . encode ( 'utf-8' ) )
127
149
try :
128
150
monitor_proc .wait (timeout = 0.2 )
129
151
print ("___" + pid + '== subprocess exited with rc =' , monitor_proc .returncode )
@@ -138,10 +160,10 @@ def communication(monitor_proc, pid):
138
160
t .join ()
139
161
140
162
monitor_proc_0 = subprocess .Popen ([monitor_command , "--seed" , str (os .getpid ())],
141
- stdout = subprocess .PIPE , stderr = subprocess .STDOUT )
163
+ stdin = subprocess . PIPE , stdout = subprocess .PIPE , stderr = subprocess .STDOUT )
142
164
143
165
monitor_proc_1 = subprocess .Popen ([monitor_command , "--seed" , str (os .getpid () + 1 )],
144
- stdout = subprocess .PIPE , stderr = subprocess .STDOUT )
166
+ stdin = subprocess . PIPE , stdout = subprocess .PIPE , stderr = subprocess .STDOUT )
145
167
146
168
t_0 = threading .Thread (target = communication , args = (monitor_proc_0 ,str (os .getpid ())))
147
169
t_1 = threading .Thread (target = communication , args = (monitor_proc_1 ,str (os .getpid () + 1 )))
0 commit comments