Skip to content

Commit e9b6146

Browse files
committed
HandshakeAck::buffer_sizes() replaces individual methods for buffer sizes
1 parent 60c7c89 commit e9b6146

File tree

9 files changed

+102
-54
lines changed

9 files changed

+102
-54
lines changed

CHANGES.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [0.6.3] - 2021-03-15
4+
5+
* `HandshakeAck::buffer_sizes()` replaces individual methods for buffer sizes
6+
37
## [0.6.2] - 2021-03-04
48

59
* Allow to override io buffer params

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-mqtt"
3-
version = "0.6.2"
3+
version = "0.6.3"
44
authors = ["ntex contributors <[email protected]>"]
55
description = "MQTT Client/Server framework for v5 and v3.1.1 protocols"
66
documentation = "https://docs.rs/ntex-mqtt"
@@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
1212
edition = "2018"
1313

1414
[dependencies]
15-
ntex = "0.3.5"
15+
ntex = "0.3.9"
1616
bitflags = "1.2.1"
1717
derive_more = "0.99.11"
1818
futures = "0.3.13"
@@ -29,4 +29,4 @@ tokio-rustls = "0.22.0"
2929
openssl = "0.10"
3030
tokio-openssl = "0.6.1"
3131

32-
ntex = { version = "0.3.5", features = ["rustls", "openssl"] }
32+
ntex = { version = "0.3.9", features = ["rustls", "openssl"] }

src/io.rs

+25-20
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77

88
use futures::FutureExt;
99

10-
pub(crate) use ntex::framed::{DispatchItem, ReadTask, State, Timer, WriteTask};
10+
pub(crate) use ntex::framed::{DispatchItem, ReadTask, State, Timer, Write, WriteTask};
1111

1212
use ntex::codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
1313
use ntex::service::{IntoService, Service};
@@ -201,7 +201,7 @@ where
201201
&mut self,
202202
item: Result<S::Response, S::Error>,
203203
response_idx: usize,
204-
state: &State,
204+
write: Write<'_>,
205205
codec: &U,
206206
wake: bool,
207207
) {
@@ -211,21 +211,21 @@ where
211211
if idx == 0 {
212212
let _ = self.queue.pop_front();
213213
self.base = self.base.wrapping_add(1);
214-
if let Err(err) = state.write_result(item, codec) {
214+
if let Err(err) = write.encode_result(item, codec) {
215215
self.error = Some(err.into());
216216
}
217217

218218
// check remaining response
219219
while let Some(item) = self.queue.front_mut().and_then(|v| v.take()) {
220220
let _ = self.queue.pop_front();
221221
self.base = self.base.wrapping_add(1);
222-
if let Err(err) = state.write_result(item, codec) {
222+
if let Err(err) = write.encode_result(item, codec) {
223223
self.error = Some(err.into());
224224
}
225225
}
226226

227227
if wake && self.queue.is_empty() {
228-
state.dsp_wake_task()
228+
write.wake_dispatcher()
229229
}
230230
} else {
231231
self.queue[idx] = ServiceResult::Ready(item);
@@ -243,6 +243,8 @@ where
243243

244244
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
245245
let mut this = self.as_mut().project();
246+
let read = this.state.read();
247+
let write = this.state.write();
246248

247249
// log::trace!("IO-DISP poll :{:?}:", this.st);
248250

@@ -254,7 +256,7 @@ where
254256
this.inner.borrow_mut().handle_result(
255257
item,
256258
*this.response_idx,
257-
this.state,
259+
write,
258260
this.codec,
259261
false,
260262
);
@@ -273,7 +275,7 @@ where
273275
let mut retry = false;
274276

275277
// service is ready, wake io read task
276-
this.state.dsp_restart_read_task();
278+
read.resume();
277279

278280
// check keepalive timeout
279281
if this.state.is_keepalive() {
@@ -282,10 +284,10 @@ where
282284
if inner.error.is_none() {
283285
inner.error = Some(IoDispatcherError::KeepAlive);
284286
}
285-
this.state.dsp_mark_stopped();
287+
this.state.dispatcher_stopped();
286288
}
287289

288-
let item = if this.state.is_dsp_stopped() {
290+
let item = if this.state.is_dispatcher_stopped() {
289291
log::trace!("dispatcher is instructed to stop");
290292
let mut inner = this.inner.borrow_mut();
291293

@@ -314,8 +316,8 @@ where
314316
item
315317
} else {
316318
// decode incoming bytes stream
317-
if this.state.is_read_ready() {
318-
match this.state.decode_item(this.codec) {
319+
if read.is_ready() {
320+
match read.decode(this.codec) {
319321
Ok(Some(el)) => {
320322
// update keep-alive timer
321323
if *this.keepalive_timeout != 0 {
@@ -337,7 +339,7 @@ where
337339
}
338340
Ok(None) => {
339341
// log::trace!("not enough data to decode next frame, register dispatch task");
340-
this.state.dsp_read_more_data(cx.waker());
342+
read.wake(cx.waker());
341343
return Poll::Pending;
342344
}
343345
Err(err) => {
@@ -359,7 +361,7 @@ where
359361
}
360362
}
361363
} else {
362-
this.state.dsp_register_task(cx.waker());
364+
this.state.register_dispatcher(cx.waker());
363365
return Poll::Pending;
364366
}
365367
};
@@ -380,7 +382,7 @@ where
380382
// check if current result is only response atm
381383
if inner.queue.is_empty() {
382384
if let Err(err) =
383-
this.state.write_result(res, this.codec)
385+
write.encode_result(res, this.codec)
384386
{
385387
inner.error = Some(err.into());
386388
}
@@ -406,7 +408,7 @@ where
406408
inner.borrow_mut().handle_result(
407409
item,
408410
response_idx,
409-
&st,
411+
st.write(),
410412
&codec,
411413
true,
412414
);
@@ -422,7 +424,7 @@ where
422424
Poll::Pending => {
423425
// pause io read task
424426
log::trace!("service is not ready, register dispatch task");
425-
this.state.dsp_service_not_ready(cx.waker());
427+
read.pause(cx.waker());
426428
return Poll::Pending;
427429
}
428430
Poll::Ready(Err(err)) => {
@@ -431,7 +433,7 @@ where
431433
*this.st = IoDispatcherState::Stop;
432434
this.inner.borrow_mut().error =
433435
Some(IoDispatcherError::Service(err));
434-
this.state.dsp_mark_stopped();
436+
this.state.dispatcher_stopped();
435437

436438
// unregister keep-alive timer
437439
if *this.keepalive_timeout != 0 {
@@ -457,7 +459,7 @@ where
457459
*this.st = IoDispatcherState::Shutdown;
458460
self.poll(cx)
459461
} else {
460-
this.state.dsp_register_task(cx.waker());
462+
this.state.register_dispatcher(cx.waker());
461463
Poll::Pending
462464
}
463465
}
@@ -636,7 +638,7 @@ mod tests {
636638
let buf = client.read().await.unwrap();
637639
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
638640

639-
assert!(st.write_item(Bytes::from_static(b"test"), &BytesCodec).is_ok());
641+
assert!(st.write().encode(Bytes::from_static(b"test"), &BytesCodec).is_ok());
640642
let buf = client.read().await.unwrap();
641643
assert_eq!(buf, Bytes::from_static(b"test"));
642644

@@ -662,7 +664,10 @@ mod tests {
662664
);
663665
ntex::rt::spawn(disp.map(|_| ()));
664666

665-
state.write_item(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &BytesCodec).unwrap();
667+
state
668+
.write()
669+
.encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &BytesCodec)
670+
.unwrap();
666671

667672
// buffer should be flushed
668673
client.remote_buffer_cap(1024);

src/v3/handshake.rs

+22-2
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,24 @@ impl<Io, St> HandshakeAck<Io, St> {
139139
}
140140

141141
#[inline]
142+
/// Set read/write buffer sizes
143+
///
144+
/// By default max buffer size is 4kb for both read and write buffer,
145+
/// Min size is 256 bytes.
146+
pub fn buffer_sizes(
147+
mut self,
148+
max_read_buf: u16,
149+
max_write_buf: u16,
150+
min_buf_size: u16,
151+
) -> Self {
152+
self.read_hw = max_read_buf;
153+
self.write_hw = max_write_buf;
154+
self.lw = min_buf_size;
155+
self
156+
}
157+
158+
#[doc(hidden)]
159+
#[deprecated(since = "0.6.3")]
142160
/// Set buffer low watermark size
143161
///
144162
/// Low watermark is the same for read and write buffers.
@@ -148,7 +166,8 @@ impl<Io, St> HandshakeAck<Io, St> {
148166
self
149167
}
150168

151-
#[inline]
169+
#[doc(hidden)]
170+
#[deprecated(since = "0.6.3")]
152171
/// Set read buffer high water mark size
153172
///
154173
/// By default read hw is 4kb
@@ -157,7 +176,8 @@ impl<Io, St> HandshakeAck<Io, St> {
157176
self
158177
}
159178

160-
#[inline]
179+
#[doc(hidden)]
180+
#[deprecated(since = "0.6.3")]
161181
/// Set write buffer high watermark size
162182
///
163183
/// By default write hw is 4kb

src/v3/server.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -388,12 +388,8 @@ where
388388

389389
log::trace!("Sending success handshake ack: {:#?}", pkt);
390390

391-
state
392-
.low_watermark(ack.lw)
393-
.read_high_watermark(ack.read_hw)
394-
.write_high_watermark(ack.write_hw)
395-
.send(&mut ack.io, &ack.shared.codec, pkt)
396-
.await?;
391+
state.set_buffer_sizes(ack.read_hw, ack.write_hw, ack.lw);
392+
state.send(&mut ack.io, &ack.shared.codec, pkt).await?;
397393

398394
Ok((
399395
ack.io,

src/v3/sink.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl MqttSink {
5454
/// responses, but it flushes buffers.
5555
pub fn force_close(&self) {
5656
if self.0.state.is_open() {
57-
let _ = self.0.state.shutdown();
57+
let _ = self.0.state.force_close();
5858
}
5959
let mut queues = self.0.queues.borrow_mut();
6060
queues.inflight.clear();
@@ -63,7 +63,7 @@ impl MqttSink {
6363

6464
/// Send ping
6565
pub(super) fn ping(&self) -> bool {
66-
self.0.state.write_item(codec::Packet::PingRequest, &self.0.codec).is_ok()
66+
self.0.state.write().encode(codec::Packet::PingRequest, &self.0.codec).is_ok()
6767
}
6868

6969
/// Create publish message builder
@@ -179,7 +179,8 @@ impl PublishBuilder {
179179
log::trace!("Publish (QoS-0) to {:?}", packet.topic);
180180
self.shared
181181
.state
182-
.write_item(codec::Packet::Publish(packet), &self.shared.codec)
182+
.write()
183+
.encode(codec::Packet::Publish(packet), &self.shared.codec)
183184
.map_err(SendPacketError::Encode)
184185
.map(|_| ())
185186
} else {
@@ -224,7 +225,7 @@ impl PublishBuilder {
224225

225226
log::trace!("Publish (QoS1) to {:#?}", packet);
226227

227-
match shared.state.write_item(codec::Packet::Publish(packet), &shared.codec) {
228+
match shared.state.write().encode(codec::Packet::Publish(packet), &shared.codec) {
228229
Ok(_) => {
229230
// do not borrow cross yield points
230231
drop(queues);
@@ -296,7 +297,7 @@ impl SubscribeBuilder {
296297
// send subscribe to client
297298
log::trace!("Sending subscribe packet id: {} filters:{:?}", idx, filters);
298299

299-
match shared.state.write_item(
300+
match shared.state.write().encode(
300301
codec::Packet::Subscribe {
301302
packet_id: NonZeroU16::new(idx).unwrap(),
302303
topic_filters: filters,
@@ -377,7 +378,7 @@ impl UnsubscribeBuilder {
377378
// send subscribe to client
378379
log::trace!("Sending unsubscribe packet id: {} filters:{:?}", idx, filters);
379380

380-
match shared.state.write_item(
381+
match shared.state.write().encode(
381382
codec::Packet::Unsubscribe {
382383
packet_id: NonZeroU16::new(idx).unwrap(),
383384
topic_filters: filters,

src/v5/handshake.rs

+22-2
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,24 @@ impl<Io, St> HandshakeAck<Io, St> {
130130
}
131131

132132
#[inline]
133+
/// Set read/write buffer sizes
134+
///
135+
/// By default max buffer size is 4kb for both read and write buffer,
136+
/// Min size is 256 bytes.
137+
pub fn buffer_sizes(
138+
mut self,
139+
max_read_buf: u16,
140+
max_write_buf: u16,
141+
min_buf_size: u16,
142+
) -> Self {
143+
self.read_hw = max_read_buf;
144+
self.write_hw = max_write_buf;
145+
self.lw = min_buf_size;
146+
self
147+
}
148+
149+
#[doc(hidden)]
150+
#[deprecated(since = "0.6.3")]
133151
/// Set buffer low watermark size
134152
///
135153
/// Low watermark is the same for read and write buffers.
@@ -139,7 +157,8 @@ impl<Io, St> HandshakeAck<Io, St> {
139157
self
140158
}
141159

142-
#[inline]
160+
#[doc(hidden)]
161+
#[deprecated(since = "0.6.3")]
143162
/// Set read buffer high water mark size
144163
///
145164
/// By default read hw is 4kb
@@ -148,7 +167,8 @@ impl<Io, St> HandshakeAck<Io, St> {
148167
self
149168
}
150169

151-
#[inline]
170+
#[doc(hidden)]
171+
#[deprecated(since = "0.6.3")]
152172
/// Set write buffer high watermark size
153173
///
154174
/// By default write hw is 4kb

src/v5/server.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -456,10 +456,8 @@ where
456456
ack.packet.server_keepalive_sec = Some(ack.keepalive as u16);
457457
}
458458

459+
state.set_buffer_sizes(ack.read_hw, ack.write_hw, ack.lw);
459460
state
460-
.low_watermark(ack.lw)
461-
.read_high_watermark(ack.read_hw)
462-
.write_high_watermark(ack.write_hw)
463461
.send(&mut ack.io, &shared.codec, mqtt::Packet::ConnectAck(ack.packet))
464462
.await?;
465463

@@ -483,7 +481,8 @@ where
483481
&& ack
484482
.shared
485483
.state
486-
.write_item(mqtt::Packet::ConnectAck(ack.packet), &ack.shared.codec)
484+
.write()
485+
.encode(mqtt::Packet::ConnectAck(ack.packet), &ack.shared.codec)
487486
.is_ok()
488487
{
489488
WriteTask::shutdown(

0 commit comments

Comments
 (0)