diff --git a/bson/marshal.go b/bson/marshal.go index 21631d8156..88a85c0ffb 100644 --- a/bson/marshal.go +++ b/bson/marshal.go @@ -74,7 +74,10 @@ func Marshal(val interface{}) ([]byte, error) { } }() sw.Reset() - vw := NewDocumentWriter(sw) + + vw := getDocumentWriter(sw) + defer putDocumentWriter(vw) + enc := encPool.Get().(*Encoder) defer encPool.Put(enc) enc.Reset(vw) diff --git a/bson/unmarshal.go b/bson/unmarshal.go index 72870c10ab..bf97da23b1 100644 --- a/bson/unmarshal.go +++ b/bson/unmarshal.go @@ -43,6 +43,8 @@ type ValueUnmarshaler interface { // pointer, the pointer is set to nil without calling UnmarshalBSONValue. func Unmarshal(data []byte, val interface{}) error { vr := newDocumentReader(bytes.NewReader(data)) + defer releaseDocumentReader(vr) + if l, err := vr.peekLength(); err != nil { return err } else if int(l) != len(data) { diff --git a/bson/value_reader.go b/bson/value_reader.go index 678c47b106..7c26c41411 100644 --- a/bson/value_reader.go +++ b/bson/value_reader.go @@ -13,6 +13,7 @@ import ( "fmt" "io" "math" + "sync" ) var _ ValueReader = &valueReader{} @@ -29,6 +30,20 @@ type vrState struct { end int64 } +var bufioReaderPool = sync.Pool{ + New: func() interface{} { + return bufio.NewReader(nil) + }, +} + +var vrPool = sync.Pool{ + New: func() interface{} { + return &valueReader{ + stack: make([]vrState, 1, 5), + } + }, +} + // valueReader is for reading BSON values. type valueReader struct { r *bufio.Reader @@ -57,14 +72,26 @@ func newValueReader(t Type, r io.Reader) ValueReader { } func newDocumentReader(r io.Reader) *valueReader { - stack := make([]vrState, 1, 5) - stack[0] = vrState{ - mode: mTopLevel, - } - return &valueReader{ - r: bufio.NewReader(r), - stack: stack, - } + vr := vrPool.Get().(*valueReader) + + vr.offset = 0 + vr.frame = 0 + + vr.stack = vr.stack[:1] + vr.stack[0].mode = mTopLevel + + br := bufioReaderPool.Get().(*bufio.Reader) + br.Reset(r) + vr.r = br + + return vr +} + +func releaseDocumentReader(vr *valueReader) { + bufioReaderPool.Put(vr.r) + vr.r = nil + + vrPool.Put(vr) } func (vr *valueReader) advanceFrame() { @@ -253,14 +280,28 @@ func (vr *valueReader) appendNextElement(dst []byte) ([]byte, error) { return nil, err } - buf := make([]byte, length) - _, err = io.ReadFull(vr.r, buf) + buf, err := vr.r.Peek(int(length)) if err != nil { + if err == bufio.ErrBufferFull { + temp := make([]byte, length) + if _, err = io.ReadFull(vr.r, temp); err != nil { + return nil, err + } + dst = append(dst, temp...) + vr.offset += int64(len(temp)) + return dst, nil + } + return nil, err } + dst = append(dst, buf...) - vr.offset += int64(len(buf)) - return dst, err + if _, err = vr.r.Discard(int(length)); err != nil { + return nil, err + } + + vr.offset += int64(length) + return dst, nil } func (vr *valueReader) readValueBytes(dst []byte) (Type, []byte, error) { diff --git a/bson/value_writer.go b/bson/value_writer.go index 57334a925d..74921e4183 100644 --- a/bson/value_writer.go +++ b/bson/value_writer.go @@ -33,6 +33,29 @@ func putValueWriter(vw *valueWriter) { } } +var documentWriterPool = sync.Pool{ + New: func() interface{} { + return NewDocumentWriter(nil) + }, +} + +func getDocumentWriter(w io.Writer) *valueWriter { + vw := documentWriterPool.Get().(*valueWriter) + + vw.reset(vw.buf) + vw.buf = vw.buf[:0] + vw.w = w + + return vw +} + +func putDocumentWriter(vw *valueWriter) { + if vw != nil { + vw.w = nil // don't leak the writer + documentWriterPool.Put(vw) + } +} + // This is here so that during testing we can change it and not require // allocating a 4GB slice. var maxSize = math.MaxInt32