sourceafMongo::ConnectionManagerPooled.fan

using concurrent
using afConcurrent
using inet

** Manages a pool of connections. 
** 
** Connections are created on-demand and kept in a pool when idle. 
** 
** Note this connection manager is safe for multi-threaded / web-application use.
//** Once the pool is exhausted, any operation requiring a connection will block (for 'maxWaitTime') 
//** waiting for an available connection.
const class ConnectionManagerPooled : ConnectionManager {
    private const SynchronizedState connectionState
    private const OneShotLock       shutdownLock
    
    ** The maximum number of database connections this pool should open.
    ** Set it to the number of concurrent users you expect to use your application.
    const Int       maxNoOfConnections  := 10

    ** The maximum time a thread may wait for a connection to become available.
//  const Duration  maxWaitTime         := 10sec
    
    new make(ActorPool actorPool, |->Connection| connectionFactory, |This|? f := null) {
        f?.call(this)   
        this.connectionState    = SynchronizedState(actorPool, ConnectionManagerPoolState#)
        this.shutdownLock       = OneShotLock("Connection Pool has been shutdown")
        
        // given it's only ever going to be used inside the state thread, it should be safe to unsafe it over
        sFactory := Unsafe(connectionFactory).toImmutable
        connectionState.withState |ConnectionManagerPoolState state| {
            state.connectionFactory = sFactory.val
        }.get
    }
    
    new makeWithIpAddr(ActorPool actorPool, IpAddr ipAddr := IpAddr("127.0.0.1"), Int port := 27017, SocketOptions? options := null, |This|? f := null) : this.make(actorPool, |->Connection| { TcpConnection(ipAddr, port, options) }) { }
    
    @NoDoc  // nothing interesting to add here
    override Obj? leaseConnection(|Connection->Obj?| c) {
        connection := checkOut
        try {
            obj := c(connection)
            return obj
        } finally {
            checkIn(connection)
        }
    }

    @NoDoc  // nothing interesting to add here
    override Void shutdown() {
        shutdownLock.lock
        
        // TODO: wait for used sockets to be checked in
        connectionState.withState |ConnectionManagerPoolState state| {
            state.connectionFactory = null
            
            state.checkedIn.each { it.close }
            state.checkedIn.clear

            // TODO: Wait!
            state.checkedOut.each { it.close }
            state.checkedOut.clear
        }
    }
    
    private Connection checkOut() {
        shutdownLock.check
        // TODO: log warning if all in use, and set timeout for max wait and re-tries

//      default wait time = 200ms -> is an eternity for computers, tiny for humans. set as a public NoDoc field 
        
        return (Connection) connectionState.getState |ConnectionManagerPoolState state->Unsafe?| {
            if (!state.checkedIn.isEmpty) {
                connection := state.checkedIn.pop
                state.checkedOut.push(connection)
                return Unsafe(connection)
            }
            
            if (state.checkedOut.size >= maxNoOfConnections)
                // TODO: return empty handed & wait for a free one
                throw MongoErr("Argh! No more connections! All ${maxNoOfConnections} are in use!")
            
            connection := state.connectionFactory()
            state.checkedOut.push(connection)
            return Unsafe(connection)
        }?->val
    }

    private Void checkIn(Connection connection) {
        unsafeConnection := Unsafe(connection)
        connectionState.withState |ConnectionManagerPoolState state| {
            conn := (Connection) unsafeConnection.val
            state.checkedOut.removeSame(conn)
            
            // make sure we don't save stale connections
            if (!conn.isClosed)
                state.checkedIn.push(conn)

        // call get() to make sure this thread checks in before it asks for a new one
        }.get   
    }
}

internal class ConnectionManagerPoolState {
    Connection[]    checkedOut  := [,]
    Connection[]    checkedIn   := [,]
    |->Connection|? connectionFactory
}