1: <?php
2:
3: namespace Guzzle\Http\Curl;
4:
5: use Guzzle\Common\AbstractHasDispatcher;
6: use Guzzle\Http\Message\RequestInterface;
7:
8: /**
9: * Proxies requests and connections to a pool of internal curl_multi handles. Each recursive call will add requests
10: * to the next available CurlMulti handle.
11: */
12: class CurlMultiProxy extends AbstractHasDispatcher implements CurlMultiInterface
13: {
14: protected $handles = array();
15: protected $groups = array();
16: protected $queued = array();
17: protected $maxHandles;
18:
19: /**
20: * @param int $maxHandles The maximum number of idle CurlMulti handles to allow to remain open
21: */
22: public function __construct($maxHandles = 3)
23: {
24: $this->maxHandles = $maxHandles;
25: // You can get some weird "Too many open files" errors when sending a large amount of requests in parallel.
26: // These two statements autoload classes before a system runs out of file descriptors so that you can get back
27: // valuable error messages if you run out.
28: class_exists('Guzzle\Http\Message\Response');
29: class_exists('Guzzle\Http\Exception\CurlException');
30: }
31:
32: /**
33: * {@inheritdoc}
34: */
35: public function add(RequestInterface $request)
36: {
37: $this->queued[] = $request;
38:
39: return $this;
40: }
41:
42: /**
43: * {@inheritdoc}
44: */
45: public function all()
46: {
47: $requests = $this->queued;
48: foreach ($this->handles as $handle) {
49: $requests = array_merge($requests, $handle->all());
50: }
51:
52: return $requests;
53: }
54:
55: /**
56: * {@inheritdoc}
57: */
58: public function remove(RequestInterface $request)
59: {
60: foreach ($this->queued as $i => $r) {
61: if ($request === $r) {
62: unset($this->queued[$i]);
63: return true;
64: }
65: }
66:
67: foreach ($this->handles as $handle) {
68: if ($handle->remove($request)) {
69: return true;
70: }
71: }
72:
73: return false;
74: }
75:
76: /**
77: * {@inheritdoc}
78: */
79: public function reset($hard = false)
80: {
81: $this->queued = array();
82: $this->groups = array();
83: foreach ($this->handles as $handle) {
84: $handle->reset();
85: }
86: if ($hard) {
87: $this->handles = array();
88: }
89:
90: return $this;
91: }
92:
93: /**
94: * {@inheritdoc}
95: */
96: public function send()
97: {
98: if ($this->queued) {
99: $group = $this->getAvailableHandle();
100: // Add this handle to a list of handles than is claimed
101: $this->groups[] = $group;
102: while ($request = array_shift($this->queued)) {
103: $group->add($request);
104: }
105: $group->send();
106: array_pop($this->groups);
107: $this->cleanupHandles();
108: }
109: }
110:
111: /**
112: * {@inheritdoc}
113: */
114: public function count()
115: {
116: return count($this->all());
117: }
118:
119: /**
120: * Get an existing available CurlMulti handle or create a new one
121: *
122: * @return CurlMulti
123: */
124: protected function getAvailableHandle()
125: {
126: // Grab a handle that is not claimed
127: foreach ($this->handles as $h) {
128: if (!in_array($h, $this->groups, true)) {
129: return $h;
130: }
131: }
132:
133: // All are claimed, so create one
134: $handle = new CurlMulti();
135: $handle->setEventDispatcher($this->getEventDispatcher());
136: $this->handles[] = $handle;
137:
138: return $handle;
139: }
140:
141: /**
142: * Trims down unused CurlMulti handles to limit the number of open connections
143: */
144: protected function cleanupHandles()
145: {
146: if ($diff = max(0, count($this->handles) - $this->maxHandles)) {
147: for ($i = count($this->handles) - 1; $i > 0 && $diff > 0; $i--) {
148: if (!count($this->handles[$i])) {
149: unset($this->handles[$i]);
150: $diff--;
151: }
152: }
153: $this->handles = array_values($this->handles);
154: }
155: }
156: }
157: