Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions src/Pools/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@

use Exception;

/**
* @template TResource
*/
class Connection
{
protected string $id = '';

/**
* @var Pool<TResource>|null
*/
protected ?Pool $pool = null;

/**
* @param mixed $resource
* @param TResource $resource
*/
public function __construct(protected mixed $resource)
{
Expand All @@ -27,52 +33,53 @@ public function getID(): string

/**
* @param string $id
* @return self
* @return $this<TResource>
*/
public function setID(string $id): self
public function setID(string $id): static
{
$this->id = $id;
return $this;
}

/**
* @return mixed
* @return TResource
*/
public function getResource(): mixed
{
return $this->resource;
}

/**
* @param mixed $resource
* @return self
* @param TResource $resource
* @return $this<TResource>
*/
public function setResource(mixed $resource): self
public function setResource(mixed $resource): static
{
$this->resource = $resource;
return $this;
}

/**
* @return Pool
* @return Pool<TResource>|null
*/
public function getPool(): ?Pool
{
return $this->pool;
}

/**
* @param Pool $pool
* @return self
* @param Pool<TResource> $pool
* @return $this<TResource>
*/
public function setPool(Pool &$pool): self
public function setPool(Pool $pool): static
{
$this->pool = $pool;
return $this;
}

/**
* @return Pool
* @return Pool<TResource>
* @throws Exception
*/
public function reclaim(): Pool
{
Expand All @@ -84,7 +91,8 @@ public function reclaim(): Pool
}

/**
* @return Pool
* @return Pool<TResource>
* @throws Exception
*/
public function destroy(): Pool
{
Expand Down
48 changes: 26 additions & 22 deletions src/Pools/Group.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,44 @@
class Group
{
/**
* @var Pool[]
* @var array<Pool<covariant mixed>>
*/
protected array $pools = [];

/**
* @param Pool $pool
* @return self
* @param Pool<covariant mixed> $pool
* @return static
*/
public function add(Pool $pool): self
public function add(Pool $pool): static
{
$this->pools[$pool->getName()] = $pool;
return $this;
}

/**
* @param string $name
* @return Pool
* @return Pool<covariant mixed>
* @throws Exception
*/
public function get(string $name): Pool
{
return $this->pools[$name] ?? throw new Exception("Pool '{$name}' not found");
return $this->pools[$name] ?? throw new Exception("Pool '$name' not found");
}

/**
* @param string $name
* @return self
* @return static
*/
public function remove(string $name): self
public function remove(string $name): static
{
unset($this->pools[$name]);
return $this;
}

/**
* @return self
* @return static
*/
public function reclaim(): self
public function reclaim(): static
{
foreach ($this->pools as $pool) {
$pool->reclaim();
Expand All @@ -56,9 +57,11 @@ public function reclaim(): self
/**
* Execute a callback with a managed connection
*
* @param string[] $names Name of resources
* @param callable(mixed...): mixed $callback Function that receives the connection resources
* @return mixed Return value from the callback
* @template TReturn
* @param array<string> $names Name of resources
* @param callable(mixed...): TReturn $callback Function that receives the connection resources
* @return TReturn Return value from the callback
* @throws Exception
*/
public function use(array $names, callable $callback): mixed
{
Expand All @@ -71,10 +74,11 @@ public function use(array $names, callable $callback): mixed
/**
* Internal recursive callback for `use`.
*
* @param string[] $names Name of resources
* @param callable(mixed...): mixed $callback Function that receives the connection resources
* @param mixed[] $resources
* @return mixed
* @template TReturn
* @param array<string> $names Name of resources
* @param callable(mixed...): TReturn $callback Function that receives the connection resources
* @param array<mixed> $resources
* @return TReturn
* @throws Exception
*/
private function useInternal(array $names, callable $callback, array $resources = []): mixed
Expand All @@ -90,9 +94,9 @@ private function useInternal(array $names, callable $callback, array $resources

/**
* @param int $reconnectAttempts
* @return self
* @return static
*/
public function setReconnectAttempts(int $reconnectAttempts): self
public function setReconnectAttempts(int $reconnectAttempts): static
{
foreach ($this->pools as $pool) {
$pool->setReconnectAttempts($reconnectAttempts);
Expand All @@ -103,9 +107,9 @@ public function setReconnectAttempts(int $reconnectAttempts): self

/**
* @param int $reconnectSleep
* @return self
* @return static
*/
public function setReconnectSleep(int $reconnectSleep): self
public function setReconnectSleep(int $reconnectSleep): static
{
foreach ($this->pools as $pool) {
$pool->setReconnectSleep($reconnectSleep);
Expand All @@ -114,7 +118,7 @@ public function setReconnectSleep(int $reconnectSleep): self
return $this;
}

public function setTelemetry(Telemetry $telemetry): self
public function setTelemetry(Telemetry $telemetry): static
{
foreach ($this->pools as $pool) {
$pool->setTelemetry($telemetry);
Expand Down
60 changes: 35 additions & 25 deletions src/Pools/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
use Utopia\Telemetry\Gauge;
use Utopia\Telemetry\Histogram;

/**
* @template TResource
*/
class Pool
{
/**
Expand Down Expand Up @@ -46,12 +49,12 @@ class Pool
protected int $retrySleep = 1; // seconds

/**
* @var array<Connection|true>
* @var array<Connection<TResource>|true>
*/
protected array $pool = [];

/**
* @var array<string, Connection>
* @var array<string, Connection<TResource>>
*/
protected array $active = [];

Expand All @@ -67,7 +70,7 @@ class Pool
/**
* @param string $name
* @param int $size
* @param callable $init
* @param callable(): TResource $init
*/
public function __construct(string $name, int $size, callable $init)
{
Expand Down Expand Up @@ -104,9 +107,9 @@ public function getReconnectAttempts(): int

/**
* @param int $reconnectAttempts
* @return self
* @return $this<TResource>
*/
public function setReconnectAttempts(int $reconnectAttempts): self
public function setReconnectAttempts(int $reconnectAttempts): static
{
$this->reconnectAttempts = $reconnectAttempts;
return $this;
Expand All @@ -122,9 +125,9 @@ public function getReconnectSleep(): int

/**
* @param int $reconnectSleep
* @return self
* @return $this<TResource>
*/
public function setReconnectSleep(int $reconnectSleep): self
public function setReconnectSleep(int $reconnectSleep): static
{
$this->reconnectSleep = $reconnectSleep;
return $this;
Expand All @@ -140,9 +143,9 @@ public function getRetryAttempts(): int

/**
* @param int $retryAttempts
* @return self
* @return $this<TResource>
*/
public function setRetryAttempts(int $retryAttempts): self
public function setRetryAttempts(int $retryAttempts): static
{
$this->retryAttempts = $retryAttempts;
return $this;
Expand All @@ -158,15 +161,19 @@ public function getRetrySleep(): int

/**
* @param int $retrySleep
* @return self
* @return $this<TResource>
*/
public function setRetrySleep(int $retrySleep): self
public function setRetrySleep(int $retrySleep): static
{
$this->retrySleep = $retrySleep;
return $this;
}

public function setTelemetry(Telemetry $telemetry): void
/**
* @param Telemetry $telemetry
* @return $this<TResource>
*/
public function setTelemetry(Telemetry $telemetry): static
{
$this->telemetryOpenConnections = $telemetry->createGauge('pool.connection.open.count');
$this->telemetryActiveConnections = $telemetry->createGauge('pool.connection.active.count');
Expand All @@ -183,13 +190,16 @@ public function setTelemetry(Telemetry $telemetry): void
advisory: ['ExplicitBucketBoundaries' => [0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]],
);
$this->telemetryAttributes = ['pool' => $this->name, 'size' => $this->size];

return $this;
}

/**
* Execute a callback with a managed connection
*
* @param callable(mixed): mixed $callback Function that receives the connection resource
* @return mixed Return value from the callback
* @template T
* @param callable(mixed): T $callback Function that receives the connection resource
* @return T Return value from the callback
*/
public function use(callable $callback): mixed
{
Expand All @@ -213,7 +223,8 @@ public function use(callable $callback): mixed
* 3. If still no connection is available, throw an exception
* 4. If a connection is available, return it
*
* @return Connection
* @return Connection<TResource>
* @throws Exception
* @internal Please migrate to `use`.
*/
public function pop(): Connection
Expand Down Expand Up @@ -275,10 +286,10 @@ public function pop(): Connection
}

/**
* @param Connection $connection
* @return self
* @param Connection<TResource> $connection
* @return $this<TResource>
*/
public function push(Connection $connection): self
public function push(Connection $connection): static
{
try {
$this->pool[] = $connection;
Expand All @@ -299,10 +310,10 @@ public function count(): int
}

/**
* @param Connection|null $connection
* @return self
* @param Connection<TResource>|null $connection
* @return $this<TResource>
*/
public function reclaim(Connection $connection = null): self
public function reclaim(Connection $connection = null): static
{
if ($connection !== null) {
$this->push($connection);
Expand All @@ -316,12 +327,11 @@ public function reclaim(Connection $connection = null): self
return $this;
}


/**
* @param Connection|null $connection
* @return self
* @param Connection<TResource>|null $connection
* @return $this<TResource>
*/
public function destroy(Connection $connection = null): self
public function destroy(Connection $connection = null): static
{
try {
if ($connection !== null) {
Expand Down
3 changes: 3 additions & 0 deletions tests/Pools/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

class ConnectionTest extends TestCase
{
/**
* @var Connection<string>
*/
protected Connection $object;

public function setUp(): void
Expand Down
Loading