sourceafMongo::ConnectionManagerPooled.fan

using concurrent
using afConcurrent
using inet

** Manages a pool of connections. 
** 
** Connections are created on-demand and a total of 'minPoolSize' are kept in a pool when idle. 
** Once the pool is exhausted, any operation requiring a connection will block for (at most) 'waitQueueTimeout' 
** waiting for an available connection.
** 
** This connection manager is created with the standard [Mongo Connection URL]`http://docs.mongodb.org/manual/reference/connection-string/` in the format:
** 
**   mongodb://[username:password@]host[:port][/[database][?options]]
** 
** Examples:
** 
**   mongodb://localhost:27017
**   mongodb://username:password@example1.com/puppies?maxPoolSize=50
** 
** If connecting to a replica set then multiple hosts (with optional ports) may be specified:
** 
**   mongodb://db1.example.net,db2.example.net:2500/?connectTimeoutMS=30000
** 
** On 'startup()' the hosts are queried to find the primary / master node. 
** All read and write operations are performed on the primary node.
** 
** When a connection to the master node is lost, all hosts are re-queried to find the new master.
** 
** Note this connection manager *is* safe for multi-threaded / web-application use.
const class ConnectionManagerPooled : ConnectionManager {
    private const Log               log             := Utils.getLog(ConnectionManagerPooled#)
    private const OneShotLock       startupLock     := OneShotLock("Connection Pool has been started")
    private const OneShotLock       shutdownLock    := OneShotLock("Connection Pool has been shutdown")
    private const AtomicBool        failingOverRef  := AtomicBool(false)
    private const Synchronized      failOverThread
    private const SynchronizedState connectionState

    ** The host name of the MongoDB server this 'ConnectionManager' connects to.
    ** When connecting to replica sets, this will indicate the primary.
    ** 
    ** This value is unavailable (returns 'null') until 'startup()' is called. 
    override Uri? mongoUrl() { mongoUrlRef.val }
    private const AtomicRef mongoUrlRef := AtomicRef(null)

    ** The default write concern for all write operations. 
    ** Set by specifying the 'w', 'wtimeoutMS' and 'journal' connection string options. 
    ** 
    ** Defaults to '["w": 1, "wtimeout": 0, "j": false]'
    **  - write operations are acknowledged,
    **  - write operations never time out,
    **  - write operations need not be committed to the journal.
    override const Str:Obj? writeConcern := ["w": 1, "wtimeout": 0, "j": false]
    
    ** The default database connections are authenticated against.
    ** 
    ** Set via the `connectionUrl`.
    const Str?  defaultDatabase
    
    ** The default username connections are authenticated with.
    ** 
    ** Set via the `connectionUrl`.
    const Str?  defaultUsername
    
    ** The default password connections are authenticated with.
    ** 
    ** Set via the `connectionUrl`.
    const Str?  defaultPassword
    
    ** The original URL this 'ConnectionManager' was configured with.
    ** May contain authentication details.
    ** 
    **   mongodb://username:password@example1.com/puppies?maxPoolSize=50
    const Uri   connectionUrl
    
    ** The minimum number of database connections this pool should keep open.
    ** They are initially created during 'startup()'.
    ** 
    ** Set via the [minPoolSize]`http://docs.mongodb.org/manual/reference/connection-string/#uri.minPoolSize` connection string option.
    ** Defaults to 1.
    ** 
    **   mongodb://example.com/puppies?minPoolSize=50
    const Int   minPoolSize := 1

    ** The maximum number of database connections this pool is allowed open.
    ** This is the maximum number of concurrent users you expect your application to have.
    ** 
    ** Set via the [maxPoolSize]`http://docs.mongodb.org/manual/reference/connection-string/#uri.maxPoolSize` connection string option.
    ** Defaults to 10.
    ** 
    **   mongodb://example.com/puppies?maxPoolSize=10
    const Int   maxPoolSize := 10
    
    ** The maximum time a thread can wait for a connection to become available.
    ** 
    ** Set via the [maxPoolSize]`http://docs.mongodb.org/manual/reference/connection-string/#uri.waitQueueTimeoutMS` connection string option.
    ** Defaults to 15 seconds.
    ** 
    **   mongodb://example.com/puppies?waitQueueTimeoutMS=10
    const Duration  waitQueueTimeout := 15sec

    ** If specified, this is the time to attempt a connection before timing out.
    ** If 'null' (the default) then a system timeout is used.
    ** 
    ** Set via the [connectTimeoutMS]`http://docs.mongodb.org/manual/reference/connection-string/#uri.connectTimeoutMS` connection string option.
    ** 
    **   mongodb://example.com/puppies?connectTimeoutMS=2500
    ** 
    ** Equates to `inet::SocketOptions.connectTimeout`.
    const Duration? connectTimeout
    
    ** If specified, this is the time to attempt a send or receive on a socket before the attempt times out.
    ** 'null' (the default) indicates an infinite timeout.
    ** 
    ** Set via the [socketTimeoutMS]`http://docs.mongodb.org/manual/reference/connection-string/#uri.socketTimeoutMS` connection string option.
    ** 
    **   mongodb://example.com/puppies?socketTimeoutMS=2500
    ** 
    ** Equates to `inet::SocketOptions.receiveTimeout`.
    const Duration? socketTimeout

    ** When the connection pool is shutting down, this is the amount of time to wait for all connections for close before they are forcibly closed.
    ** 
    ** Defaults to '2sec'. 
    const Duration? shutdownTimeout := 2sec
    
    // used to test the backoff func
    internal const |Range->Int| randomFunc  := |Range r->Int| { r.random }
    internal const |Duration|   sleepFunc   := |Duration napTime| { Actor.sleep(napTime) }
    
    ** Create a 'ConnectionManager' from a [Mongo Connection URL]`http://docs.mongodb.org/manual/reference/connection-string/`.
    ** If user credentials are supplied, they are used as default authentication for each connection.
    ** 
    **   conMgr := ConnectionManagerPooled(ActorPool(), `mongodb://localhost:27017`)
    ** 
    ** The following URL options are supported:
    **  - [minPoolSize]`http://docs.mongodb.org/manual/reference/connection-string/#uri.minPoolSize`
    **  - [maxPoolSize]`http://docs.mongodb.org/manual/reference/connection-string/#uri.maxPoolSize`
    **  - [waitQueueTimeoutMS]`http://docs.mongodb.org/manual/reference/connection-string/#uri.waitQueueTimeoutMS`
    **  - [connectTimeoutMS]`http://docs.mongodb.org/manual/reference/connection-string/#uri.connectTimeoutMS`
    **  - [socketTimeoutMS]`http://docs.mongodb.org/manual/reference/connection-string/#uri.socketTimeoutMS`
    **  - [w]`http://docs.mongodb.org/manual/reference/connection-string/#uri.w`
    **  - [wtimeoutMS]`http://docs.mongodb.org/manual/reference/connection-string/#uri.wtimeoutMS`
    **  - [journal]`http://docs.mongodb.org/manual/reference/connection-string/#uri.journal`
    ** 
    ** URL examples:
    **  - 'mongodb://username:password@example1.com/database?maxPoolSize=50'
    **  - 'mongodb://example2.com?minPoolSize=10&maxPoolSize=50'
    ** 
    ** @see `http://docs.mongodb.org/manual/reference/connection-string/`
    new makeFromUrl(ActorPool actorPool, Uri connectionUrl, |This|? f := null) {
        if (connectionUrl.scheme != "mongodb")
            throw ArgErr(ErrMsgs.connectionManager_badScheme(connectionUrl))

        mongoUrl                := connectionUrl
        this.connectionUrl      = connectionUrl
        this.connectionState    = SynchronizedState(actorPool, ConnectionManagerPoolState#)
        this.failOverThread     = Synchronized(actorPool)
        this.minPoolSize        = mongoUrl.query["minPoolSize"]?.toInt ?: minPoolSize
        this.maxPoolSize        = mongoUrl.query["maxPoolSize"]?.toInt ?: maxPoolSize
        waitQueueTimeoutMs      := mongoUrl.query["waitQueueTimeoutMS"]?.toInt
        connectTimeoutMs        := mongoUrl.query["connectTimeoutMS"]?.toInt
        socketTimeoutMs         := mongoUrl.query["socketTimeoutMS"]?.toInt
        w                       := mongoUrl.query["w"]
        wtimeoutMs              := mongoUrl.query["wtimeoutMS"]?.toInt
        journal                 := mongoUrl.query["journal"]?.toBool

        if (minPoolSize < 0)
            throw ArgErr(ErrMsgs.connectionManager_badInt("minPoolSize", "zero", minPoolSize, mongoUrl))
        if (maxPoolSize < 1)
            throw ArgErr(ErrMsgs.connectionManager_badInt("maxPoolSize", "one", maxPoolSize, mongoUrl))
        if (minPoolSize > maxPoolSize)
            throw ArgErr(ErrMsgs.connectionManager_badMinMaxConnectionSize(minPoolSize, maxPoolSize, mongoUrl))     
        if (waitQueueTimeoutMs != null && waitQueueTimeoutMs < 0)
            throw ArgErr(ErrMsgs.connectionManager_badInt("waitQueueTimeoutMS", "zero", waitQueueTimeoutMs, mongoUrl))
        if (connectTimeoutMs != null && connectTimeoutMs < 0)
            throw ArgErr(ErrMsgs.connectionManager_badInt("connectTimeoutMS", "zero", connectTimeoutMs, mongoUrl))
        if (socketTimeoutMs != null && socketTimeoutMs < 0)
            throw ArgErr(ErrMsgs.connectionManager_badInt("socketTimeoutMS", "zero", socketTimeoutMs, mongoUrl))
        if (wtimeoutMs != null && wtimeoutMs < 0)
            throw ArgErr(ErrMsgs.connectionManager_badInt("wtimeoutMS", "zero", wtimeoutMs, mongoUrl))

        if (waitQueueTimeoutMs != null)
            waitQueueTimeout = (waitQueueTimeoutMs * 1_000_000).toDuration
        if (connectTimeoutMs != null)
            connectTimeout = (connectTimeoutMs * 1_000_000).toDuration
        if (socketTimeoutMs != null)
            socketTimeout = (socketTimeoutMs * 1_000_000).toDuration

        database := mongoUrl.pathStr.trimToNull
        username := mongoUrl.userInfo?.split(':')?.getSafe(0)?.trimToNull
        password := mongoUrl.userInfo?.split(':')?.getSafe(1)?.trimToNull
        
        if ((username == null).xor(password == null))
            throw ArgErr(ErrMsgs.connectionManager_badUsernamePasswordCombo(username, password, mongoUrl))

        if (database != null && database.startsWith("/"))
            database = database[1..-1].trimToNull
        if (username != null && password != null && database == null)
            database = "admin"
        if (username == null && password == null)   // a default database has no meaning without credentials
            database = null
        
        defaultDatabase = database
        defaultUsername = username
        defaultPassword = password
        
        writeConcern := Str:Obj?[:] { it.ordered=true }.add("w", 1).add("wtimeout", 0).add("j", false)
        if (w != null)
            writeConcern["w"] = Int.fromStr(w, 10, false) != null ? w.toInt : w
        if (wtimeoutMs != null)
            writeConcern["wtimeout"] = wtimeoutMs
        if (journal != null)
            writeConcern["j"] = journal
        this.writeConcern = writeConcern

        query := mongoUrl.query.rw
        query.remove("minPoolSize")
        query.remove("maxPoolSize")
        query.remove("waitQueueTimeoutMS")
        query.remove("connectTimeoutMS")
        query.remove("socketTimeoutMS")
        query.remove("w")
        query.remove("wtimeoutMS")
        query.remove("journal")
        query.each |val, key| {
            log.warn(LogMsgs.connectionManager_unknownUrlOption(key, val, mongoUrl))
        }
        
        // allow the it-block to override the default settings
        // no validation occurs - only used for testing.
        f?.call(this)
    }
    
    ** Creates the initial pool and establishes 'minPoolSize' connections with the server.
    ** 
    ** If a connection URL to a replica set is given (a connection URL with multiple hosts) then 
    ** the hosts are queried to find the primary. The primary is currently used for all read and 
    ** write operations. 
    override ConnectionManager startup() {
        shutdownLock.check
        if (startupLock.locked)
            return this
        startupLock.lock
        
        huntThePrimary

        // connect x times
        pool := TcpConnection[,]
        minPoolSize.times { pool.push(checkOut) }
        minPoolSize.times { checkIn(pool.pop) }
        
        return this
    }

    ** Makes a connection available to the given function.
    ** What ever is returned from the func is returned from the method.
    ** 
    ** If all connections are currently in use, a truncated binary exponential backoff algorithm 
    ** is used to wait for one to become free. If, while waiting, the duration specified in 
    ** 'waitQueueTimeout' expires then a 'MongoErr' is thrown.
    ** 
    ** All leased connections are authenticated against the default credentials.
    override Obj? leaseConnection(|Connection->Obj?| c) {
        if (!startupLock.locked)
            throw MongoErr(ErrMsgs.connectionManager_notStarted)
        shutdownLock.check

        connection := checkOut
        try {
            return c(connection)

        } catch (MongoErr e) {
            err := e as Err
            connection.close

            if (!err.msg.contains("MongoDB says: not master"))
                throw err

            // if the master URL has changed, then we've already found a new master!
            if (connection.mongoUrl != mongoUrl)
                throw err

            // if we're still connected to the same master, lets play huntThePrimary!
            failOver
                
            // even though Hunt the Primary succeeded, we still need to report the original error!
            throw err
            
        } catch (Err err) {
            // if something dies, kill the connection.
            // we may have died part way through talking with the server meaning our communication 
            // protocols are out of sync - rendering any future use of the connection useless.
            connection.close
            throw err
            
        } finally {
            checkIn(connection)
        }
    }
    
    ** Closes all connections. 
    ** Initially waits for 'shutdownTimeout' for connections to finish what they're doing before 
    ** they're closed. After that, all open connections are forcibly closed regardless of whether 
    ** they're in use or not.
    override ConnectionManager shutdown() {
        if (!startupLock.locked)
            return this
        shutdownLock.lock
        
        closeFunc := |->Bool?| {
            waitingOn := connectionState.sync |ConnectionManagerPoolState state -> Int| {
                while (!state.checkedIn.isEmpty) {
                    state.checkedIn.removeAt(0).close 
                }
                return state.checkedOut.size
            }
            if (waitingOn > 0)
                log.info(LogMsgs.connectionManager_waitingForConnectionsToClose(waitingOn, mongoUrl))
            return waitingOn > 0 ? null : true
        }
        
        allClosed := backoffFunc(closeFunc, shutdownTimeout) ?: false

        if (!allClosed) {
            // too late, they've had their chance. Now everybody dies.
            connectionState.async |ConnectionManagerPoolState state| {
                // just in case one or two snuck back in
                while (!state.checkedIn.isEmpty) {
                    state.checkedIn.removeAt(0).close 
                }
                
                // DIE! DIE! DIE!
                while (!state.checkedOut.isEmpty) {
                    state.checkedOut.removeAt(0).close 
                }
            }.get
        }
        
        return this
    }
    
    ** Returns the number of pooled connections currently in use.
    Int noOfConnectionsInUse() {
        connectionState.sync |ConnectionManagerPoolState state->Int| {
            state.checkedOut.size
        }       
    }

    ** Returns the number of connections currently in the pool.
    Int noOfConnectionsInPool() {
        connectionState.sync |ConnectionManagerPoolState state->Int| {
            state.checkedOut.size + state.checkedIn.size
        }
    }
    
    ** (Advanced)
    ** Closes all un-leased connections in the pool, and flags all leased connections to close 
    ** themselves after use. Use to migrate connections to new host / master.
    Void emptyPool() {
        connectionState.sync |ConnectionManagerPoolState state| {
            while (!state.checkedIn.isEmpty) {
                state.checkedIn.removeAt(0).close 
            }
            state.checkedOut.each { it.forceCloseOnCheckIn = true }
        }
    
        // re-connect x times
        pool := TcpConnection[,]
        minPoolSize.times { pool.push(checkOut) }
        minPoolSize.times { checkIn(pool.pop) }
    }
    
    ** (Advanced)
    ** Searches the replica set for the Master node and instructs all new connections to connect to it.
    ** Throws 'MongoErr' if a primary can not be found. 
    ** 
    ** This method should be followed with a call to 'emptyPool()'.  
    Void huntThePrimary() {
        hg      := connectionUrl.host.split(',')
        hostList := (HostDetails[]) hg.map { HostDetails(it) }
        hostList.last.port = connectionUrl.port ?: 27017
        hosts   := Str:HostDetails[:] { it.ordered=true }.addList(hostList) { it.host }
        
        // default to the first host
        primary := (HostDetails?) null
        
        // let's play hunt the primary! Always check, even if only 1 host is supplied, it may still 
        // be part of a replica set
        // first, check the list of supplied hosts
        primary = hostList.eachWhile |hd->HostDetails?| {
            // Is it? Is it!?
            if (hd.populate.isPrimary)
                return hd

            // now lets contact what it thinks is the primary, to double check
            // assume if it's been contacted, it's not the primary - cos we would have returned it already
            if (hd.primary != null && hosts[hd.primary]?.contacted != true) {
                if (hosts[hd.primary] == null) 
                    hosts[hd.primary] = HostDetails(hd.primary)
                if (hosts[hd.primary].populate.isPrimary)
                    return hosts[hd.primary]
            }

            // keep looking!
            return null
        }

        // the above should have flushed out the primary, but if not, check *all* the returned hosts
        if (primary == null) {
            // add all the hosts to our map
            hostList.each |hd| {
                hd.hosts.each {
                    if (hosts[it] == null)
                        hosts[it] = HostDetails(it)
                }
            }

            // loop through them all
            primary = hosts.find { !it.contacted && it.populate.isPrimary }
        }

        // Bugger!
        if (primary == null)
            throw MongoErr(ErrMsgs.connectionManager_couldNotFindPrimary(connectionUrl))

        primaryAddress  := primary.address
        primaryPort     := primary.port
        
        // remove user credentials and other crud from the url
        mongoUrl := `mongodb://${primaryAddress}:${primaryPort}`
        mongoUrlRef.val = mongoUrl

        // set our connection factory
        connectionState.sync |ConnectionManagerPoolState state| {
            state.connectionFactory = |->Connection| {
                socket := TcpSocket()
                socket.options.connectTimeout = connectTimeout
                socket.options.receiveTimeout = socketTimeout
                return TcpConnection(socket).connect(IpAddr(primaryAddress), primaryPort) {
                    it.mongoUrl = mongoUrl
                }
            } 
        }

        log.info(LogMsgs.connectionManager_foundNewMaster(mongoUrl))
    }
    
    private Void failOver() {
        // no need to have 3 threads huntingThePrimary at the same time!
        if (failingOverRef.val == true)
            return

        // it doesn't matter if a race condition means we play huntThePrimary twice in succession
        failOverThread.async |->| {
            failingOverRef.val = true
            try {
                huntThePrimary
                emptyPool
                
                // we're an unsung hero - we've established a new master connection and nobody knows! 
                
            } catch (Err err) {
                log.warn("Could not find new Master", err)

            } finally {
                failingOverRef.val = false
            }
        }
    }
    
    ** Implements a truncated binary exponential backoff algorithm. *Damn, I'm good!*
    ** Returns 'null' if the operation timed out.
    ** 
    ** @see `http://en.wikipedia.org/wiki/Exponential_backoff`
    internal Obj? backoffFunc(|Duration totalNapTime->Obj?| func, Duration timeout) {
        result          := null
        c               := 0
        i               := 10
        totalNapTime    := 0ms
        
        while (result == null && totalNapTime < timeout) {

            result = func.call(totalNapTime)

            if (result == null) {
                if (++c > i) c = i  // truncate the exponentiation ~ 10 secs
                napTime := (randomFunc(0..<2.pow(c)) * 10 * 1000000).toDuration

                // don't over sleep!
                if ((totalNapTime + napTime) > timeout)
                    napTime = timeout - totalNapTime 

                sleepFunc(napTime)
                totalNapTime += napTime
                
                // if we're about to quit, lets have 1 more last ditch attempt!
                if (totalNapTime >= timeout)
                    result = func.call(totalNapTime)
            }
        }
        
        return result
    }

    private TcpConnection checkOut() {
        connectionFunc := |Duration totalNapTime->TcpConnection?| {
            con := connectionState.sync |ConnectionManagerPoolState state->Unsafe?| {
                while (!state.checkedIn.isEmpty) {
                    connection := state.checkedIn.pop
                    
                    // check the connection is still alive - the server may have closed it during a fail over
                    if (!connection.isClosed) {
                        state.checkedOut.push(connection)
                        return Unsafe(connection)
                    }
                }

                if (state.checkedOut.size < maxPoolSize) {
                    connection := state.connectionFactory()
                    state.checkedOut.push(connection)
                    return Unsafe(connection)
                }
                
                return null
            }?->val
            
            // let's not swamp the logs the first time we can't connect
            // 1.5 secs gives at least 6 connection attempts
            if (con == null && totalNapTime > 1.5sec)
                log.warn(LogMsgs.connectionManager_waitingForConnectionsToFree(maxPoolSize, mongoUrl))

            return con
        }

        connection  := null as TcpConnection
        ioErr       := null as Err
        try connection = backoffFunc(connectionFunc, waitQueueTimeout)
        
        // sys::IOErr: Could not connect to MongoDB at `dsXXXXXX-a0.mlab.com:59296` - java.net.ConnectException: Connection refused
        catch (IOErr ioe)
            ioErr = ioe

        if (connection == null || ioErr != null) {
            if (noOfConnectionsInUse == maxPoolSize)
                throw MongoErr("Argh! No more connections! All ${maxPoolSize} are in use!")
            
            // it would appear the database is down ... :(          
            // so lets kick off a game of huntThePrimary in the background ...
            failOver

            // ... and report an error - 'cos we can't wait longer than 'waitQueueTimeout'
            throw ioErr ?: MongoErr("Argh! Can not connect to Master! All ${maxPoolSize} are in use!")
        }
        
        // ensure all connections that are initially leased are authenticated as the default user
        // specifically do the check here so you can always *brute force* an authentication on a connection
        if (defaultDatabase != null && connection.authentications[defaultDatabase] != defaultUsername)
            connection.authenticate(defaultDatabase, defaultUsername, defaultPassword)
        
        return connection
    }
    
    private Void checkIn(TcpConnection connection) {
        unsafeConnection := Unsafe(connection)
        // call sync() to make sure this thread checks in before it asks for a new one
        connectionState.sync |ConnectionManagerPoolState state| {
            conn := (TcpConnection) unsafeConnection.val
            state.checkedOut.removeSame(conn)
            
            // make sure we don't save stale connections
            if (!conn.isClosed)
                // only keep the min pool size
                if (conn.forceCloseOnCheckIn || state.checkedIn.size >= minPoolSize)
                    conn.close
                else
                    state.checkedIn.push(conn)
        }
    }
}

internal class ConnectionManagerPoolState {
    TcpConnection[]     checkedIn   := [,]
    TcpConnection[]     checkedOut  := [,]
    |->TcpConnection|?  connectionFactory
}

internal class HostDetails {
    static const Log    log := Utils.getLog(HostDetails#)
    Str     address
    Int     port
    Bool    contacted
    Bool    isPrimary
    Bool    isSecondary
    Str[]   hosts   := Obj#.emptyList
    Str?    primary
    
    new make(Str addr) {
        uri := `//${addr}`
        this.address = uri.host ?: "127.0.0.1"
        this.port    = uri.port ?: 27017
    }
    
    This populate() {
        contacted = true
        
        connection := TcpConnection()
        try {
            connection.connect(IpAddr(address), port)
            conMgr := ConnectionManagerLocal(connection, "mongodb://${address}:${port}".toUri)
            details := Database(conMgr, "admin").runCmd(["ismaster":1])
        
            isPrimary   = details["ismaster"]  == true          // '== true' to avoid NPEs if key doesn't exist
            isSecondary = details["secondary"] == true          // '== true' to avoid NPEs if key doesn't exist in standalone instances  
            primary     = details["primary"]                    // standalone instances don't have primary information
            hosts       = details["hosts"] ?: Obj#.emptyList    // standalone instances don't have hosts information
            
        } catch (Err err) {
            // if a replica is down, simply log it and move onto the next one!
            log.warn("Could not connect to Host ${address}:${port} :: ${err.typeof.name} - ${err.msg}")

        } finally connection.close
        
        return this
    }
    
    Str host() { "${address}:${port}" }

    override Str toStr() { host }
}