sourceafMongo::ConnectionManagerPooled.fan

using concurrent
using afConcurrent
using inet

** Manages a pool of connections. 
** 
** Connections are created on-demand and kept in a pool when idle. 
** 
** Note this connection manager *is* safe for multi-threaded / web-application use.
//** Once the pool is exhausted, any operation requiring a connection will block (for 'maxWaitTime') 
//** waiting for an available connection.
const class ConnectionManagerPooled : ConnectionManager {
    private const OneShotLock       startupLock     := OneShotLock("Connection Pool has been started")
    private const OneShotLock       shutdownLock    := OneShotLock("Connection Pool has been shutdown")
    private const SynchronizedState connectionState
    
    ** The host name of the MongoDB server this 'ConnectionManager' connects to.
    override const Uri mongoUri
    
    ** The URI this 'ConnectionManager' was configured with.
    ** 
    **   `mongodb://username:password@example1.com/puppies?maxPoolSize=50`
    const Uri   connectionUri
    
    ** The minimum number of database connections this pool should keep open.
    ** They are initially created during 'startup()'.
    const Int   minPoolSize := 0

    ** The maximum number of database connections this pool should open.
    ** Set it to the number of concurrent users you expect to use your application.
    const Int   maxPoolSize := 10

    ** The default database connections are authenticated against.
    const Str?  defaultDatabase
    
    ** The default username connections are authenticated with.
    const Str?  defaultUsername
    
    ** The default password connections are authenticated with.
    const Str?  defaultPassword
    
    ** The maximum time a thread may wait for a connection to become available.
//  const Duration  maxWaitTime         := 10sec

    ** Create a 'ConnectionManager' from a [Mongo Connection URI]`http://docs.mongodb.org/manual/reference/connection-string/`.
    ** If user credentials are supplied, they are used as default authentication for each connection.
    ** 
    ** The following Uri options are supported:
    **  - [minPoolSize]`http://docs.mongodb.org/manual/reference/connection-string/#uri.minPoolSize`
    **  - [maxPoolSize]`http://docs.mongodb.org/manual/reference/connection-string/#uri.maxPoolSize`
    ** 
    ** TODO: connectTimeoutMS
    ** TODO: socketTimeoutMS
    ** TODO: Write Concern Options
    ** 
    ** URI examples:
    **   `mongodb://username:password@example1.com/database?maxPoolSize=50`
    **   `mongodb://example2.com?minPoolSize=10&maxPoolSize=50`
    ** 
    ** @see `http://docs.mongodb.org/manual/reference/connection-string/`
    new makeFromUri(ActorPool actorPool, Uri connectionUri) {
        if (connectionUri.scheme != "mongodb")
            throw ArgErr(ErrMsgs.connectionManager_badScheme(connectionUri))
        
        this.mongoUri           = connectionUri
        this.connectionUri      = connectionUri
        this.connectionState    = SynchronizedState(actorPool, ConnectionManagerPoolState#)
        this.minPoolSize        = mongoUri.query["minPoolSize"]?.toInt ?: minPoolSize
        this.maxPoolSize        = mongoUri.query["maxPoolSize"]?.toInt ?: maxPoolSize
        
        if (minPoolSize < 0)
            throw ArgErr(ErrMsgs.connectionManager_badMinConnectionSize(minPoolSize, mongoUri))
        if (maxPoolSize < 1)
            throw ArgErr(ErrMsgs.connectionManager_badMaxConnectionSize(maxPoolSize, mongoUri))
        if (minPoolSize > maxPoolSize)
            throw ArgErr(ErrMsgs.connectionManager_badMinMaxConnectionSize(minPoolSize, maxPoolSize, mongoUri))     

        address  := mongoUri.host ?: "127.0.0.1"
        port     := mongoUri.port ?: 27017
        database := trimToNull(mongoUri.pathOnly.toStr)
        username := trimToNull(mongoUri.userInfo?.split(':')?.getSafe(0))
        password := trimToNull(mongoUri.userInfo?.split(':')?.getSafe(1))
        
        if ((username == null).xor(password == null))
            throw ArgErr(ErrMsgs.connectionManager_badUsernamePasswordCombo(username, password, mongoUri))

        if (database != null && database.startsWith("/"))
            database = trimToNull(database[1..-1])
        if (username != null && password != null && database == null)
            database = "admin"
        if (username == null && password == null)   // a default database has no meaning without credentials
            database = null
        
        defaultDatabase = database
        defaultUsername = username
        defaultPassword = password
        connectionState.withState |ConnectionManagerPoolState state| {
            state.connectionFactory = |->Connection| {
                TcpConnection(IpAddr(address), port)
            }
        }.get
        
        // remove user credentials and other crud from the uri
        mongoUri = `mongodb://${address}:${port}`
    }
    
    ** Makes a connection available to the given function.
    ** 
    ** All leased connections are authenticated against the default credentials.
    override Obj? leaseConnection(|Connection->Obj?| c) {
        connection := checkOut
        try {
            return c(connection)

        } catch (Err err) {
            // if something dies, kill the connection.
            // we may have died part way through talking with the server meaning our communication 
            // protocols are out of sync - rendering any future use of the connection useless.
            connection.close
            throw err
            
        } finally {
            checkIn(connection)
        }
    }
    
    ** Creates the initial pool and establishes 'minPoolSize' connections with the server.
    override ConnectionManager startup() {
        if (startupLock.locked)
            return this
        startupLock.lock

        // connect x times
        (1..minPoolSize).toList.map { checkOut }.each { checkIn(it) }
        
        return this
    }

    ** Closes all connections.
    override ConnectionManager shutdown() {
        shutdownLock.lock
        
        // TODO: wait for used sockets to be checked in
        connectionState.withState |ConnectionManagerPoolState state| {
            state.connectionFactory = null

            state.checkedIn.each { it.close }
            state.checkedIn.clear

            // TODO: Wait!
            state.checkedOut.each { it.close }
            state.checkedOut.clear
        }.get

        return this
    }
    
    private Connection checkOut() {
        shutdownLock.check
        // TODO: log warning if all in use, and set timeout for max wait and re-tries

//      default wait time = 200ms -> is an eternity for computers, tiny for humans. set as a public NoDoc field 
        
        connection := (Connection) connectionState.getState |ConnectionManagerPoolState state->Unsafe?| {
            if (!state.checkedIn.isEmpty) {
                connection := state.checkedIn.pop
                state.checkedOut.push(connection)
                return Unsafe(connection)
            }
            
            if (state.checkedOut.size >= maxPoolSize)
                // TODO: return empty handed & wait for a free one
                throw MongoErr("Argh! No more connections! All ${maxPoolSize} are in use!")
            
            connection := state.connectionFactory()
            state.checkedOut.push(connection)
            return Unsafe(connection)
        }?->val
        
        // ensure all connections are initially leased authenticated as the default user
        // specifically do the check here so you can always *brute force* an authentication on a connection
        if (defaultDatabase != null && connection.authentications[defaultDatabase] != defaultUsername)
            connection.authenticate(defaultDatabase, defaultUsername, defaultPassword)
        
        return connection
    }

    private Void checkIn(Connection connection) {
        unsafeConnection := Unsafe(connection)
        connectionState.withState |ConnectionManagerPoolState state| {
            conn := (Connection) unsafeConnection.val
            state.checkedOut.removeSame(conn)
            
            // make sure we don't save stale connections
            if (!conn.isClosed)
                state.checkedIn.push(conn)

        // call get() to make sure this thread checks in before it asks for a new one
        }.get   
    }
    
    private Str? trimToNull(Str? str) {
        (str?.trim?.isEmpty ?: true) ? null : str.trim
    }
}

internal class ConnectionManagerPoolState {
    Connection[]    checkedOut  := [,]
    Connection[]    checkedIn   := [,]
    |->Connection|? connectionFactory
}