6
6
using System . Collections . Generic ;
7
7
using System . Diagnostics ;
8
8
using System . IO ;
9
+ using System . IO . Pipelines ;
9
10
using System . Linq ;
10
11
using System . Security . Claims ;
11
12
using System . Text ;
13
+ using System . Threading ;
12
14
using System . Threading . Tasks ;
13
15
using MessagePack ;
14
16
using MessagePack . Formatters ;
@@ -2797,6 +2799,78 @@ public async Task ReceivingMessagesPreventsConnectionTimeoutFromOccuring()
2797
2799
}
2798
2800
}
2799
2801
2802
+ internal class PipeReaderWrapper : PipeReader
2803
+ {
2804
+ private readonly PipeReader _originalPipeReader ;
2805
+ private TaskCompletionSource < object > _waitForRead ;
2806
+ private object _lock = new object ( ) ;
2807
+
2808
+ public PipeReaderWrapper ( PipeReader pipeReader )
2809
+ {
2810
+ _originalPipeReader = pipeReader ;
2811
+ _waitForRead = new TaskCompletionSource < object > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
2812
+ }
2813
+
2814
+ public override void AdvanceTo ( SequencePosition consumed ) =>
2815
+ _originalPipeReader . AdvanceTo ( consumed ) ;
2816
+
2817
+ public override void AdvanceTo ( SequencePosition consumed , SequencePosition examined ) =>
2818
+ _originalPipeReader . AdvanceTo ( consumed , examined ) ;
2819
+
2820
+ public override void CancelPendingRead ( ) =>
2821
+ _originalPipeReader . CancelPendingRead ( ) ;
2822
+
2823
+ public override void Complete ( Exception exception = null ) =>
2824
+ _originalPipeReader . Complete ( exception ) ;
2825
+
2826
+ public override async ValueTask < ReadResult > ReadAsync ( CancellationToken cancellationToken = default )
2827
+ {
2828
+ lock ( _lock )
2829
+ {
2830
+ _waitForRead . SetResult ( null ) ;
2831
+ }
2832
+
2833
+ try
2834
+ {
2835
+ return await _originalPipeReader . ReadAsync ( cancellationToken ) ;
2836
+ }
2837
+ finally
2838
+ {
2839
+ lock ( _lock )
2840
+ {
2841
+ _waitForRead = new TaskCompletionSource < object > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
2842
+ }
2843
+ }
2844
+ }
2845
+
2846
+ public override bool TryRead ( out ReadResult result ) =>
2847
+ _originalPipeReader . TryRead ( out result ) ;
2848
+
2849
+ public Task WaitForReadStart ( )
2850
+ {
2851
+ lock ( _lock )
2852
+ {
2853
+ return _waitForRead . Task ;
2854
+ }
2855
+ }
2856
+ }
2857
+
2858
+ internal class CustomDuplex : IDuplexPipe
2859
+ {
2860
+ private readonly IDuplexPipe _originalDuplexPipe ;
2861
+ public readonly PipeReaderWrapper WrappedPipeReader ;
2862
+
2863
+ public CustomDuplex ( IDuplexPipe duplexPipe )
2864
+ {
2865
+ _originalDuplexPipe = duplexPipe ;
2866
+ WrappedPipeReader = new PipeReaderWrapper ( _originalDuplexPipe . Input ) ;
2867
+ }
2868
+
2869
+ public PipeReader Input => WrappedPipeReader ;
2870
+
2871
+ public PipeWriter Output => _originalDuplexPipe . Output ;
2872
+ }
2873
+
2800
2874
[ Fact ]
2801
2875
public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout ( )
2802
2876
{
@@ -2813,6 +2887,9 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout()
2813
2887
2814
2888
using ( var client = new TestClient ( new JsonHubProtocol ( ) ) )
2815
2889
{
2890
+ var customDuplex = new CustomDuplex ( client . Connection . Transport ) ;
2891
+ client . Connection . Transport = customDuplex ;
2892
+
2816
2893
var connectionHandlerTask = await client . ConnectAsync ( connectionHandler ) ;
2817
2894
// This starts the timeout logic
2818
2895
await client . SendHubMessageAsync ( PingMessage . Instance ) ;
@@ -2829,6 +2906,11 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout()
2829
2906
2830
2907
await hubMethodTask . OrTimeout ( ) ;
2831
2908
2909
+ // There is a small window when the hub method finishes and the timer starts again
2910
+ // So we need to delay a little before ticking the heart beat.
2911
+ // We do this by waiting until we know the HubConnectionHandler code is in pipe.ReadAsync()
2912
+ await customDuplex . WrappedPipeReader . WaitForReadStart ( ) . OrTimeout ( ) ;
2913
+
2832
2914
// Tick heartbeat again now that we're outside of the hub method
2833
2915
client . TickHeartbeat ( ) ;
2834
2916
0 commit comments