36
36
from .message import Message
37
37
from .types import (
38
38
P ,
39
+ PahoSocket ,
39
40
PayloadType ,
40
41
SocketOption ,
41
42
SubscribeTopic ,
42
43
T ,
43
44
WebSocketHeaders ,
44
- _PahoSocket ,
45
45
)
46
46
47
47
if sys .version_info >= (3 , 11 ):
@@ -178,7 +178,8 @@ class Client:
178
178
client when it disconnects. If ``False``, the client is a persistent client
179
179
and subscription information and queued messages will be retained when the
180
180
client disconnects.
181
- transport: The transport protocol to use. Either ``"tcp"``, ``"websockets"`` or ``"unix"``.
181
+ transport: The transport protocol to use. Either ``"tcp"``, ``"websockets"`` or
182
+ ``"unix"``.
182
183
timeout: The default timeout for all communication with the broker in seconds.
183
184
keepalive: The keepalive timeout for the client in seconds.
184
185
bind_address: The IP address of a local network interface to bind this client
@@ -205,7 +206,7 @@ class Client:
205
206
websocket_headers: The headers to use for websockets.
206
207
"""
207
208
208
- def __init__ ( # noqa: C901, PLR0912, PLR0913, PLR0915
209
+ def __init__ (
209
210
self ,
210
211
hostname : str ,
211
212
port : int = 1883 ,
@@ -253,7 +254,8 @@ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915
253
254
254
255
# Pending subscribe, unsubscribe, and publish calls
255
256
self ._pending_subscribes : dict [
256
- int , asyncio .Future [tuple [int , ...] | list [ReasonCode ]]
257
+ int ,
258
+ asyncio .Future [tuple [int , ...] | list [ReasonCode ]],
257
259
] = {}
258
260
self ._pending_unsubscribes : dict [int , asyncio .Event ] = {}
259
261
self ._pending_publishes : dict [int , asyncio .Event ] = {}
@@ -337,7 +339,11 @@ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915
337
339
338
340
if will is not None :
339
341
self ._client .will_set (
340
- will .topic , will .payload , will .qos , will .retain , will .properties
342
+ will .topic ,
343
+ will .payload ,
344
+ will .qos ,
345
+ will .retain ,
346
+ will .properties ,
341
347
)
342
348
343
349
if socket_options is None :
@@ -377,9 +383,9 @@ async def subscribe(
377
383
qos : int = 0 ,
378
384
options : SubscribeOptions | None = None ,
379
385
properties : Properties | None = None ,
380
- * args : Any ,
386
+ * args : Any , # noqa: ANN401
381
387
timeout : float | None = None ,
382
- ** kwargs : Any ,
388
+ ** kwargs : Any , # noqa: ANN401
383
389
) -> tuple [int , ...] | list [ReasonCode ]:
384
390
"""Subscribe to a topic or wildcard.
385
391
@@ -397,7 +403,12 @@ async def subscribe(
397
403
398
404
"""
399
405
result , mid = self ._client .subscribe (
400
- topic , qos , options , properties , * args , ** kwargs
406
+ topic ,
407
+ qos ,
408
+ options ,
409
+ properties ,
410
+ * args ,
411
+ ** kwargs ,
401
412
)
402
413
# Early out on error
403
414
if result != mqtt .MQTT_ERR_SUCCESS or mid is None :
@@ -416,9 +427,9 @@ async def unsubscribe(
416
427
/ ,
417
428
topic : str | list [str ],
418
429
properties : Properties | None = None ,
419
- * args : Any ,
430
+ * args : Any , # noqa: ANN401
420
431
timeout : float | None = None ,
421
- ** kwargs : Any ,
432
+ ** kwargs : Any , # noqa: ANN401
422
433
) -> None :
423
434
"""Unsubscribe from a topic or wildcard.
424
435
@@ -443,17 +454,17 @@ async def unsubscribe(
443
454
await self ._wait_for (confirmation .wait (), timeout = timeout )
444
455
445
456
@_outgoing_call
446
- async def publish ( # noqa: PLR0913
457
+ async def publish (
447
458
self ,
448
459
/ ,
449
460
topic : str ,
450
461
payload : PayloadType = None ,
451
462
qos : int = 0 ,
452
- retain : bool = False ,
463
+ retain : bool = False , # noqa: FBT001, FBT002
453
464
properties : Properties | None = None ,
454
- * args : Any ,
465
+ * args : Any , # noqa: ANN401
455
466
timeout : float | None = None ,
456
- ** kwargs : Any ,
467
+ ** kwargs : Any , # noqa: ANN401
457
468
) -> None :
458
469
"""Publish a message to the broker.
459
470
@@ -471,7 +482,13 @@ async def publish( # noqa: PLR0913
471
482
method.
472
483
"""
473
484
info = self ._client .publish (
474
- topic , payload , qos , retain , properties , * args , ** kwargs
485
+ topic ,
486
+ payload ,
487
+ qos ,
488
+ retain ,
489
+ properties ,
490
+ * args ,
491
+ ** kwargs ,
475
492
) # [2]
476
493
# Early out on error
477
494
if info .rc != mqtt .MQTT_ERR_SUCCESS :
@@ -485,22 +502,23 @@ async def publish( # noqa: PLR0913
485
502
# Wait for confirmation
486
503
await self ._wait_for (confirmation .wait (), timeout = timeout )
487
504
488
- async def _wait_for (
489
- self , fut : Awaitable [T ], timeout : float | None , ** kwargs : Any
490
- ) -> T :
505
+ async def _wait_for (self , fut : Awaitable [T ], timeout : float | None ) -> T :
491
506
if timeout is None :
492
507
timeout = self .timeout
493
508
# Note that asyncio uses `None` to mean "No timeout". We use `math.inf`.
494
509
timeout_for_asyncio = None if timeout == math .inf else timeout
495
510
try :
496
- return await asyncio .wait_for (fut , timeout = timeout_for_asyncio , ** kwargs )
511
+ return await asyncio .wait_for (fut , timeout = timeout_for_asyncio )
497
512
except asyncio .TimeoutError :
498
513
msg = "Operation timed out"
499
514
raise MqttError (msg ) from None
500
515
501
516
@contextlib .contextmanager
502
517
def _pending_call (
503
- self , mid : int , value : T , pending_dict : dict [int , T ]
518
+ self ,
519
+ mid : int ,
520
+ value : T ,
521
+ pending_dict : dict [int , T ],
504
522
) -> Iterator [None ]:
505
523
if mid in self ._pending_calls :
506
524
msg = f'There already exists a pending call for message ID "{ mid } "'
@@ -520,18 +538,16 @@ def _pending_call(
520
538
#
521
539
# However, if the callback doesn't get called (e.g., due to a
522
540
# network error) we still need to remove the item from the dict.
523
- try :
541
+ with contextlib . suppress ( KeyError ) :
524
542
del pending_dict [mid ]
525
- except KeyError :
526
- pass
527
543
528
544
def _on_connect (
529
545
self ,
530
- client : mqtt .Client ,
531
- userdata : Any ,
532
- flags : mqtt .ConnectFlags ,
546
+ client : mqtt .Client , # noqa: ARG002
547
+ userdata : Any , # noqa: ARG002, ANN401
548
+ flags : mqtt .ConnectFlags , # noqa: ARG002
533
549
reason_code : ReasonCode ,
534
- properties : Properties | None = None ,
550
+ properties : Properties | None = None , # noqa: ARG002
535
551
) -> None :
536
552
"""Called when we receive a CONNACK message from the broker."""
537
553
# Return early if already connected. Sometimes, paho-mqtt calls _on_connect
@@ -548,11 +564,11 @@ def _on_connect(
548
564
549
565
def _on_disconnect (
550
566
self ,
551
- client : mqtt .Client ,
552
- userdata : Any ,
553
- flags : mqtt .DisconnectFlags ,
567
+ client : mqtt .Client , # noqa: ARG002
568
+ userdata : Any , # noqa: ARG002, ANN401
569
+ flags : mqtt .DisconnectFlags , # noqa: ARG002
554
570
reason_code : ReasonCode ,
555
- properties : Properties | None = None ,
571
+ properties : Properties | None = None , # noqa: ARG002
556
572
) -> None :
557
573
# Return early if the disconnect is already acknowledged.
558
574
# Sometimes (e.g., due to timeouts), paho-mqtt calls _on_disconnect
@@ -575,16 +591,16 @@ def _on_disconnect(
575
591
self ._disconnected .set_result (None )
576
592
else :
577
593
self ._disconnected .set_exception (
578
- MqttCodeError (reason_code , "Unexpected disconnection" )
594
+ MqttCodeError (reason_code , "Unexpected disconnection" ),
579
595
)
580
596
581
597
def _on_subscribe (
582
598
self ,
583
- client : mqtt .Client ,
584
- userdata : Any ,
599
+ client : mqtt .Client , # noqa: ARG002
600
+ userdata : Any , # noqa: ARG002, ANN401
585
601
mid : int ,
586
602
reason_codes : list [ReasonCode ],
587
- properties : Properties | None = None ,
603
+ properties : Properties | None = None , # noqa: ARG002
588
604
) -> None :
589
605
"""Called when we receive a SUBACK message from the broker."""
590
606
try :
@@ -593,27 +609,32 @@ def _on_subscribe(
593
609
fut .set_result (reason_codes )
594
610
except KeyError :
595
611
self ._logger .exception (
596
- 'Unexpected message ID "%d" in on_subscribe callback' , mid
612
+ 'Unexpected message ID "%d" in on_subscribe callback' ,
613
+ mid ,
597
614
)
598
615
599
616
def _on_unsubscribe (
600
617
self ,
601
- client : mqtt .Client ,
602
- userdata : Any ,
618
+ client : mqtt .Client , # noqa: ARG002
619
+ userdata : Any , # noqa: ARG002, ANN401
603
620
mid : int ,
604
- reason_codes : list [ReasonCode ],
605
- properties : Properties | None = None ,
621
+ reason_codes : list [ReasonCode ], # noqa: ARG002
622
+ properties : Properties | None = None , # noqa: ARG002
606
623
) -> None :
607
624
"""Called when we receive an UNSUBACK message from the broker."""
608
625
try :
609
626
self ._pending_unsubscribes .pop (mid ).set ()
610
627
except KeyError :
611
628
self ._logger .exception (
612
- 'Unexpected message ID "%d" in on_unsubscribe callback' , mid
629
+ 'Unexpected message ID "%d" in on_unsubscribe callback' ,
630
+ mid ,
613
631
)
614
632
615
633
def _on_message (
616
- self , client : mqtt .Client , userdata : Any , message : mqtt .MQTTMessage
634
+ self ,
635
+ client : mqtt .Client , # noqa: ARG002
636
+ userdata : Any , # noqa: ARG002, ANN401
637
+ message : mqtt .MQTTMessage ,
617
638
) -> None :
618
639
# Convert the paho.mqtt message into our own Message type
619
640
m = Message ._from_paho_message (message ) # noqa: SLF001
@@ -625,30 +646,31 @@ def _on_message(
625
646
626
647
def _on_publish (
627
648
self ,
628
- client : mqtt .Client ,
629
- userdata : Any ,
649
+ client : mqtt .Client , # noqa: ARG002
650
+ userdata : Any , # noqa: ARG002, ANN401
630
651
mid : int ,
631
- reason_code : ReasonCode ,
632
- properties : Properties ,
652
+ reason_code : ReasonCode , # noqa: ARG002
653
+ properties : Properties , # noqa: ARG002
633
654
) -> None :
634
- try :
655
+ # Suppress KeyError since [2] may call on_publish before it even returns.
656
+ # That is, the message may already be published before we even get a chance to
657
+ # set up the 'pending_call' logic.
658
+ with contextlib .suppress (KeyError ):
635
659
self ._pending_publishes .pop (mid ).set ()
636
- except KeyError :
637
- # Do nothing since [2] may call on_publish before it even returns.
638
- # That is, the message may already be published before we even get a
639
- # chance to set up the 'pending_call' logic.
640
- pass
641
660
642
661
def _on_socket_open (
643
- self , client : mqtt .Client , userdata : Any , sock : _PahoSocket
662
+ self ,
663
+ client : mqtt .Client ,
664
+ userdata : Any , # noqa: ARG002, ANN401
665
+ sock : PahoSocket ,
644
666
) -> None :
645
667
def callback () -> None :
646
668
# client.loop_read() may raise an exception, such as BadPipe. It's
647
669
# usually a sign that the underlying connection broke, therefore we
648
670
# disconnect straight away
649
671
try :
650
672
client .loop_read ()
651
- except Exception as exc :
673
+ except Exception as exc : # noqa: BLE001
652
674
if not self ._disconnected .done ():
653
675
self ._disconnected .set_exception (exc )
654
676
@@ -662,7 +684,10 @@ def create_misc_task() -> None:
662
684
self ._loop .call_soon_threadsafe (create_misc_task )
663
685
664
686
def _on_socket_close (
665
- self , client : mqtt .Client , userdata : Any , sock : _PahoSocket
687
+ self ,
688
+ client : mqtt .Client , # noqa: ARG002
689
+ userdata : Any , # noqa: ARG002, ANN401
690
+ sock : PahoSocket ,
666
691
) -> None :
667
692
fileno = sock .fileno ()
668
693
if fileno > - 1 :
@@ -671,30 +696,36 @@ def _on_socket_close(
671
696
self ._loop .call_soon_threadsafe (self ._misc_task .cancel )
672
697
673
698
def _on_socket_register_write (
674
- self , client : mqtt .Client , userdata : Any , sock : _PahoSocket
699
+ self ,
700
+ client : mqtt .Client ,
701
+ userdata : Any , # noqa: ARG002, ANN401
702
+ sock : PahoSocket ,
675
703
) -> None :
676
704
def callback () -> None :
677
705
# client.loop_write() may raise an exception, such as BadPipe. It's
678
706
# usually a sign that the underlying connection broke, therefore we
679
707
# disconnect straight away
680
708
try :
681
709
client .loop_write ()
682
- except Exception as exc :
710
+ except Exception as exc : # noqa: BLE001
683
711
if not self ._disconnected .done ():
684
712
self ._disconnected .set_exception (exc )
685
713
686
- # paho-mqtt may call this function from the executor thread on which we've called
687
- # `self._client.connect()` (see [3]), so we can't do most operations on
714
+ # paho-mqtt may call this function from the executor thread on which we've
715
+ # called `self._client.connect()` (see [3]), so we can't do most operations on
688
716
# self._loop directly.
689
717
self ._loop .call_soon_threadsafe (self ._loop .add_writer , sock .fileno (), callback )
690
718
691
719
def _on_socket_unregister_write (
692
- self , client : mqtt .Client , userdata : Any , sock : _PahoSocket
720
+ self ,
721
+ client : mqtt .Client , # noqa: ARG002
722
+ userdata : Any , # noqa: ARG002, ANN401
723
+ sock : PahoSocket ,
693
724
) -> None :
694
725
self ._loop .remove_writer (sock .fileno ())
695
726
696
727
async def _misc_loop (self ) -> None :
697
- while self ._client .loop_misc () == mqtt .MQTT_ERR_SUCCESS :
728
+ while self ._client .loop_misc () == mqtt .MQTT_ERR_SUCCESS : # noqa: ASYNC110
698
729
await asyncio .sleep (1 )
699
730
700
731
async def __aenter__ (self ) -> Self :
@@ -761,7 +792,8 @@ async def __aexit__(
761
792
self ._connected = asyncio .Future ()
762
793
else :
763
794
self ._logger .warning (
764
- "Could not gracefully disconnect: %d. Forcing disconnection." , rc
795
+ "Could not gracefully disconnect: %d. Forcing disconnection." ,
796
+ rc ,
765
797
)
766
798
# Force disconnection if we cannot gracefully disconnect
767
799
if not self ._disconnected .done ():
@@ -772,7 +804,8 @@ async def __aexit__(
772
804
773
805
774
806
def _set_client_socket_defaults (
775
- client_socket : _PahoSocket | None , socket_options : Iterable [SocketOption ]
807
+ client_socket : PahoSocket | None ,
808
+ socket_options : Iterable [SocketOption ],
776
809
) -> None :
777
810
# Note that socket may be None if, e.g., the username and
778
811
# password combination didn't work. In this case, we return early.
0 commit comments