Skip to content

Commit 8a6bf43

Browse files
author
Emily Giurleo
committed
RUBY-2234 Fix error on large bulk writes with zlib (#2026)
1 parent beb5ecb commit 8a6bf43

File tree

3 files changed

+53
-12
lines changed

3 files changed

+53
-12
lines changed

Diff for: lib/mongo/server/connection_base.rb

+34-8
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,6 @@ def deliver(message, client, options = {})
186186
end
187187

188188
def serialize(message, client, buffer = BSON::ByteBuffer.new)
189-
start_size = 0
190-
final_message = message.maybe_compress(compressor, options[:zlib_compression_level])
191-
192189
# Driver specifications only mandate the fixed 16MiB limit for
193190
# serialized BSON documents. However, the server returns its
194191
# active serialized BSON document size limit in the ismaster response,
@@ -213,12 +210,41 @@ def serialize(message, client, buffer = BSON::ByteBuffer.new)
213210
max_bson_size += MAX_BSON_COMMAND_OVERHEAD
214211
end
215212

216-
final_message.serialize(buffer, max_bson_size)
217-
if max_message_size &&
218-
(buffer.length - start_size) > max_message_size
219-
then
220-
raise Error::MaxMessageSize.new(max_message_size)
213+
# RUBY-2234: It is necessary to check that the message size does not
214+
# exceed the maximum bson object size before compressing and serializing
215+
# the final message.
216+
#
217+
# This is to avoid the case where the user performs a bulk write
218+
# larger than 16MiB which, when compressed, becomes smaller than 16MiB.
219+
# If the driver does not split the bulk writes prior to compression,
220+
# the entire operation will be sent to the server, which will raise an
221+
# error because the uncompressed operation exceeds the maximum bson size.
222+
#
223+
# To address this problem, we serialize the message prior to compression
224+
# and raise an exception if the serialized message exceeds the maximum
225+
# bson size.
226+
if max_message_size
227+
# Create a separate buffer that contains the un-compressed message
228+
# for the purpose of checking its size. Write any pre-existing contents
229+
# from the original buffer into the temporary one.
230+
temp_buffer = BSON::ByteBuffer.new
231+
232+
# TODO: address the fact that this line mutates the buffer.
233+
temp_buffer.put_bytes(buffer.get_bytes(buffer.length))
234+
235+
message.serialize(temp_buffer, max_bson_size)
236+
if temp_buffer.length > max_message_size
237+
raise Error::MaxMessageSize.new(max_message_size)
238+
end
221239
end
240+
241+
# RUBY-2335: When the un-compressed message is smaller than the maximum
242+
# bson size limit, the message will be serialized twice. The operations
243+
# layer should be refactored to allow compression on an already-
244+
# serialized message.
245+
final_message = message.maybe_compress(compressor, options[:zlib_compression_level])
246+
final_message.serialize(buffer, max_bson_size)
247+
222248
buffer
223249
end
224250
end

Diff for: spec/integration/bulk_write_spec.rb

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
require 'spec_helper'
2+
3+
describe 'Bulk writes' do
4+
before do
5+
authorized_collection.drop
6+
end
7+
8+
context 'when bulk write is larger than 48MB' do
9+
let(:operations) do
10+
[ { insert_one: { text: 'a' * 1000 * 1000 } } ] * 48
11+
end
12+
13+
it 'succeeds' do
14+
expect do
15+
authorized_collection.bulk_write(operations)
16+
end.not_to raise_error
17+
end
18+
end
19+
end

Diff for: spec/integration/size_limit_spec.rb

-4
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@
8181
end
8282

8383
it 'allows bulk writes of multiple documents of exactly 16 MiB each' do
84-
if SpecConfig.instance.compressors
85-
pending "RUBY-2234"
86-
end
87-
8884
documents = []
8985
1.upto(3) do |index|
9086
document = { key: 'a' * (max_document_size - 28), _id: "in#{index}" }

0 commit comments

Comments
 (0)