sourceafMongo::Cursor.fan


** Iterates over database query results. See [Collection.find()]`afMongo::Collection.find`.
**
** @see `http://docs.mongodb.org/manual/core/cursors/`
class Cursor {
    private static const Log    _log            := Utils.getLog(Cursor#)
    private OneShotLock         _querySent  := OneShotLock("Query has been sent to MongoDB")
    private OneShotLock         _deadCursor := OneShotLock("Cursor has been killed")
    private Connection          _connection
    private Namespace           _nsCol
    private Int                 _cursorId
    private [Str:Obj?][]?       _results
    private Int                 _resultIndex
    private Int                 _downloaded
    private Int                 _indexLocal

    ** Use in 'orderBy' maps to denote sort order.
    static const Int ASC        := 1
    
    ** Use in 'orderBy' maps to denote sort order.
    static const Int DESC       := -1
    
    ** The query as used by this cursor. 
    Str:Obj? query {
        private set
    }
    
    ** The number of documents to be returned in each response from the server.
    ** 
    ** Leave as 'null' to use the default size.
    ** 
    ** This value can not be changed once a query has been sent to the server.
    Int? batchSize {
        set { _querySent.check; &batchSize = it }
    }
    
    ** The maximum number of documents this cursor will read.
    ** 
    ** Leave as 'null' or set to zero to read all results from the query.
    ** 
    ** This value can not be changed once a query has been sent to the server.
    Int? limit {
        set { _querySent.check; &limit = it?.max(0) }       
    }
    
    ** The number of documents to omit, when returning the result of the query.
    ** 
    ** Leave as '0' to return all documents.
    ** 
    ** This value can not be changed once the query has been sent to the server.
    Int skip {
        set { _querySent.check; &skip = it.max(0); &index = it }
    }
    
    ** Use to alter / set which fields returned in the Mongo responses.
    ** 
    ** Set field names with a value of '1' to limit returned fields to just those mentioned. Example:
    ** 
    ** pre>
    ** syntax: fantom
    ** cursor.projection = [
    **     "fieldName1" : 1,
    **     "fieldName2" : 1
    ** ]
    ** <pre
    ** 
    ** Would limit the returned fields to just '_id', 'fieldName1', & 'fieldName2'.
    ** 
    ** Leave as 'null' to return all fields. 
    ** 
    ** See [Projection Operators]`https://docs.mongodb.com/manual/reference/operator/projection/` for other uses.
    ** 
    ** This value can not be changed once the query has been sent to the server.
    [Str:Obj?]? projection {
        set { _querySent.check; &projection = it }
    }
    
    ** Optional flags to set in the query. 
    ** 
    ** This value can not be changed once the query has been sent to the server.
    OpQueryFlags flags {
        set { _querySent.check; &flags = it }       
    }
    
    ** A zero based index into the documents returned by the query.
    ** 
    ** pre>
    ** syntax: fantom
    ** cursor.count  // --> 10
    ** cursor.skip = 2
    ** cursor.index  // -->  2
    ** cursor.next
    ** cursor.index  // -->  3
    ** <pre
    Int index { private set }
    
    ** Query modifiers to use. Synonymous to using '_addSpecial()' in the mongo shell.
    ** 
    ** This value can not be changed once the query has been sent to the server.
    ** 
    ** @see `http://docs.mongodb.org/manual/reference/operator/query-modifier/`
    [Str:Obj?] special {
        get { _querySent.locked ? &special.ro : &special}
        set { _querySent.check  ; &special = it }
    }

    ** The name of the index to use for sorting.
    ** 
    ** This value can not be changed once the query has been sent to the server.
    ** 
    ** @see `http://docs.mongodb.org/manual/reference/operator/meta/hint/`
    Str? hint {
        get { special["\$hint"] }       
        set { _querySent.check; special["\$hint"] = it }
    }

    ** Use to order the query results in ascending or descending order.
    ** 
    ** If 'orderBy' is a '[Str:Obj?]' map, it should be a document with field names as keys. 
    ** Values may either be the standard Mongo '1' and '-1' for ascending / descending or the 
    ** strings 'ASC' / 'DESC'.
    ** Should 'orderBy' contain more than 1 entry, it must be ordered.
    ** 
    ** Examples:
    **   syntax: fantom
    ** 
    **   cursor.orderBy = ["age": 1]
    **   cursor.orderBy = [:] { ordered = true }.add("name", "asc").add("age", "desc")
    ** 
    ** This value can not be changed once the query has been sent to the server.
    ** 
    ** @see `http://docs.mongodb.org/manual/reference/operator/meta/orderby/`
    [Str:Obj?]? orderBy {
        get { _querySent.locked ? (([Str:Obj?]?) special["\$orderby"])?.ro : special["\$orderby"] }
        // convert here with no check, 'cos what is invalid today maybe valid tomorrow. 
        set {
            _querySent.check
            if (it.size > 1 && it.ordered == false)
                throw ArgErr(ErrMsgs.cursor_mapNotOrdered(it))
            special["\$orderby"] = Utils.convertAscDesc(it) 
        }
    }
    
    internal new make(Connection connection, Namespace namespace, Str:Obj? query) {
        this._connection = connection       
        this._nsCol     = namespace
        this.query      = query
        this.flags      = OpQueryFlags.none
        this.special    = _cmd
    }

    ** Used from Collection.aggregate()
    internal new makeFromId(Connection connection, Namespace namespace, Str:Obj? query, Int cursorId, [Str:Obj?][] results) {
        this._connection = connection       
        this._nsCol     = namespace
        this.query      = query
        this.flags      = OpQueryFlags.none
        this.special    = _cmd
        
        this._querySent.lock
        this._cursorId  = cursorId
        this._results   = results
    }
    
    ** Returns the next document from the query.
    ** Use with 'hasNext()' to iterate over the results:
    ** 
    ** pre>
    ** syntax: fantom
    ** 
    ** while (cursor.hasNext) {
    **     doc := cursor.next
    **     ...
    ** }
    ** <pre
    ** 
    ** If 'checked' is 'true' and there are no more results to return then an 'MongoCursorErr' is 
    ** thrown, else 'null' is returned.
    [Str:Obj?]? next(Bool checked := true) {
        // leave it to 'getSome' and 'getMore' to do the dead cursor check
        if (_results == null)
            getSome
        else if (_resultIndex >= _results.size) {
            if (_deadCursor.locked)
                return null ?: (checked ? throw MongoCursorErr(ErrMsgs.cursor_noMoreData) : null)
            _getMore(false)
        }
        if (_resultIndex >= _results.size)
            return null ?: (checked ? throw MongoCursorErr(ErrMsgs.cursor_noMoreData) : null)
        result := _results[_resultIndex++]
        index++
        _indexLocal++
        return result
    }

    ** Are more documents to be returned?
    ** Use with 'next()' to iterate over the results:
    ** 
    ** pre>
    ** syntax: fantom
    ** 
    ** while (cursor.hasNext) {
    **     doc := cursor.next
    **     ...
    ** }
    ** <pre
    Bool hasNext() {
        _indexLocal < _maxDownload
    }
    
    ** Return all *remaining* and unread documents as a List.
    ** 
    ** pre>
    ** syntax: fantom
    ** 
    ** cursor.count  // --> 10
    ** cursor.skip = 2
    ** cursor.next
    ** cursor.next
    ** list := cursor.toList
    ** list.size    // -->  6
    ** <pre
    [Str:Obj?][] toList() {
        // if nothing has been returned, ask for some data
        if (!_querySent.locked) {
            flags = OpQueryFlags.exhaust + flags
            batchSize = null
            getSome
        }

        // if all the results have been read, then just return a (subset) of results
        if (_querySent.locked && !isAlive)
            return (_resultIndex == 0) ? _results : _results[_resultIndex..-1]

        // we're in the middle of iterating, so...
        // cull any document already seen
        _results = _results[_resultIndex..-1]
        
        // read the rest
        _getMore(true)
        return _results
    }
    
    ** Returns the maximum number of documents this query can return. 
    ** 'count' is constant for any given query and *is* affected by 'skip' or 'limit'. 
    ** 
    ** @see `http://docs.mongodb.org/manual/reference/command/count/`
    once Int count() {
        _runCmd(_cmd
            .add("count", _nsCol.collectionName)
            .add("query", query)
            .add("hint", orderBy ?: hint)
            .add("limit", limit ?: 0)
            .add("skip", skip)
        )["n"]->toInt
    }

    ** Returns 'true' if the cursor is alive on the server.
    ** 
    ** Note this returns 'false' if a query has not yet been sent to the server.
    Bool isAlive() {
        _querySent.locked && _cursorId != 0
    }
    
    // ---- Helper Methods ------------------------------------------------------------------------

    internal Void kill() {
        if (isAlive) {
            Operation(_connection).killCursors([_cursorId])
            _cursorId = 0
            _deadCursor.lock
        }
    }

    internal Void getSome() {
        // drain isn't passed in, 'cos it may be user set
        drain := &flags.containsAll(OpQueryFlags.exhaust)

        _deadCursor.check
        
        // we want to limit the no. of returned results to the smallest, non-null value
        qlimit := [&batchSize, &limit].sort.find { it != null } ?: 0
        
        // if we're bringing down the entire limit, negate so the server doesn't keep the cursor 
        // open and exhaust *everything*!
        if (qlimit == &limit)
            qlimit = -&limit
        
        reply := Operation(_connection).query(_nsCol.qname, _compileQuery, qlimit, &skip, &projection, &flags)
        _querySent.lock
        _gotSome(reply, false)
        
        // if an 'exhaust' query then gulp down all the server replies
        if (drain) {
            while (isAlive) {
                reply = Operation(_connection).readReply(null)
                _gotSome(reply, true)
            }
        }
    }

    private Void _getMore(Bool drain) {
        _deadCursor.check

        // make sure the cursor doesn't ever bring down more than 'limit'
        qlimit := _getMoreQlimit
        reply := Operation(_connection).getMore(_nsCol.qname, qlimit, _cursorId)
        _gotSome(reply, false)
        
        if (drain) {
            while (isAlive && (_downloaded < _maxDownload)) {
                reply = Operation(_connection).getMore(_nsCol.qname, _getMoreQlimit, _cursorId)
                _gotSome(reply, true)
            }
            // and we're spent!
            kill
        }
    }

    private Void _gotSome(OpReplyResponse reply, Bool append) {
        _cursorId   = reply.cursorId
        _downloaded += reply.documents.size

        if (append) {
            _results.addAll(reply.documents)
        } else {
            _resultIndex    = 0         
            _results    = reply.documents
            if (index != (reply.cursorPos + skip))
                _log.warn(LogMsgs.cursor_indexOutOfSync(index, reply.cursorPos + skip))
        }

        if (!isAlive)
            _deadCursor.lock
    }
    
    private Int _getMoreQlimit() {
        qlimit := batchSize ?: 0
        if (limit != null) {
            qlimit = limit - _downloaded
        }
        return qlimit
    }
    
    private Int _maxDownload() {
        limit ?: count
    }
    
    private [Str:Obj?] _compileQuery() {
        special.isEmpty ? query : _cmd.add("\$query", query).addAll(special)
    }
    
    private Str:Obj? _cmd() {
        Str:Obj?[:] { ordered = true }
    }   
    
    private Str:Obj? _runCmd(Str:Obj? cmd) {
        Operation(_connection).runCommand("${_nsCol.databaseName}.\$cmd", cmd)
    }

    // ---- Obj Overrides -------------------------------------------------------------------------
    
    @NoDoc
    override Str toStr() {
        _nsCol.qname
    }
}