1: <?php
2:
3: namespace Guzzle\Http\Curl;
4:
5: use Guzzle\Common\AbstractHasDispatcher;
6: use Guzzle\Http\Exception\MultiTransferException;
7: use Guzzle\Http\Exception\CurlException;
8: use Guzzle\Http\Message\RequestInterface;
9:
10: 11: 12:
13: class CurlMulti extends AbstractHasDispatcher implements CurlMultiInterface
14: {
15: 16: 17:
18: protected $multiHandle;
19:
20: 21: 22:
23: protected $requests;
24:
25: 26: 27:
28: protected $handles;
29:
30: 31: 32:
33: protected $resourceHash;
34:
35: 36: 37:
38: protected $exceptions = array();
39:
40: 41: 42:
43: protected $successful = array();
44:
45: 46: 47:
48: protected $multiErrors = array(
49: CURLM_BAD_HANDLE => array('CURLM_BAD_HANDLE', 'The passed-in handle is not a valid CURLM handle.'),
50: CURLM_BAD_EASY_HANDLE => array('CURLM_BAD_EASY_HANDLE', "An easy handle was not good/valid. It could mean that it isn't an easy handle at all, or possibly that the handle already is in used by this or another multi handle."),
51: CURLM_OUT_OF_MEMORY => array('CURLM_OUT_OF_MEMORY', 'You are doomed.'),
52: CURLM_INTERNAL_ERROR => array('CURLM_INTERNAL_ERROR', 'This can only be returned if libcurl bugs. Please report it to us!')
53: );
54:
55: 56: 57:
58: public function __construct()
59: {
60: $this->multiHandle = curl_multi_init();
61:
62: if ($this->multiHandle === false) {
63: throw new CurlException('Unable to create multi handle');
64: }
65:
66: $this->reset();
67: }
68:
69: 70: 71:
72: public function __destruct()
73: {
74: if (is_resource($this->multiHandle)) {
75: curl_multi_close($this->multiHandle);
76: }
77: }
78:
79: 80: 81:
82: public function add(RequestInterface $request)
83: {
84: $this->requests[] = $request;
85:
86:
87: $this->beforeSend($request);
88: $this->dispatch(self::ADD_REQUEST, array('request' => $request));
89:
90: return $this;
91: }
92:
93: 94: 95:
96: public function all()
97: {
98: return $this->requests;
99: }
100:
101: 102: 103:
104: public function remove(RequestInterface $request)
105: {
106: $this->removeHandle($request);
107: foreach ($this->requests as $i => $r) {
108: if ($request === $r) {
109: unset($this->requests[$i]);
110: $this->requests = array_values($this->requests);
111: $this->dispatch(self::REMOVE_REQUEST, array('request' => $request));
112: return true;
113: }
114: }
115:
116: return false;
117: }
118:
119: 120: 121:
122: public function reset($hard = false)
123: {
124:
125: if ($this->requests) {
126: foreach ($this->requests as $request) {
127: $this->remove($request);
128: }
129: }
130:
131: $this->handles = new \SplObjectStorage();
132: $this->requests = $this->resourceHash = $this->exceptions = $this->successful = array();
133: }
134:
135: 136: 137:
138: public function send()
139: {
140: $this->perform();
141: $exceptions = $this->exceptions;
142: $successful = $this->successful;
143: $this->reset();
144:
145: if ($exceptions) {
146: $this->throwMultiException($exceptions, $successful);
147: }
148: }
149:
150: 151: 152:
153: public function count()
154: {
155: return count($this->requests);
156: }
157:
158: 159: 160: 161: 162: 163: 164:
165: protected function throwMultiException(array $exceptions, array $successful)
166: {
167: $multiException = new MultiTransferException('Errors during multi transfer');
168:
169: while ($e = array_shift($exceptions)) {
170: $multiException->add($e['exception']);
171: $multiException->addFailedRequest($e['request']);
172: }
173:
174:
175: foreach ($successful as $request) {
176: if (!$multiException->containsRequest($request)) {
177: $multiException->addSuccessfulRequest($request);
178: }
179: }
180:
181: throw $multiException;
182: }
183:
184: 185: 186: 187: 188: 189:
190: protected function beforeSend(RequestInterface $request)
191: {
192: try {
193:
194: if ($request->hasHeader('Transfer-Encoding') && $request->hasHeader('Content-Length')) {
195: $request->removeHeader('Transfer-Encoding');
196: }
197: $request->setState(RequestInterface::STATE_TRANSFER);
198: $request->dispatch('request.before_send', array('request' => $request));
199: if ($request->getState() != RequestInterface::STATE_TRANSFER) {
200:
201: $this->remove($request);
202: if ($request->getState() == RequestInterface::STATE_COMPLETE) {
203: $this->successful[] = $request;
204: }
205: } else {
206:
207: $this->checkCurlResult(curl_multi_add_handle($this->multiHandle, $this->createCurlHandle($request)->getHandle()));
208: }
209: } catch (\Exception $e) {
210:
211: $this->removeErroredRequest($request, $e);
212: }
213: }
214:
215: 216: 217: 218: 219: 220: 221:
222: protected function createCurlHandle(RequestInterface $request)
223: {
224: $wrapper = CurlHandle::factory($request);
225: $this->handles[$request] = $wrapper;
226: $this->resourceHash[(int) $wrapper->getHandle()] = $request;
227:
228: return $wrapper;
229: }
230:
231: 232: 233:
234: protected function perform()
235: {
236: if (!$this->requests) {
237: return;
238: }
239:
240:
241: $active = $mrc = null;
242: $this->executeHandles($active, $mrc, 0.001);
243: $event = array('curl_multi' => $this);
244:
245: while (1) {
246:
247: $this->processMessages();
248:
249:
250: if (!$this->requests) {
251: break;
252: }
253:
254:
255: $blocking = $total = 0;
256: foreach ($this->requests as $request) {
257: $event['request'] = $request;
258: $request->dispatch(self::POLLING_REQUEST, $event);
259: ++$total;
260:
261: if ($request->getParams()->hasKey(self::BLOCKING)) {
262: ++$blocking;
263: }
264: }
265:
266: if ($blocking == $total) {
267:
268: usleep(500);
269: } else {
270: do {
271: $this->executeHandles($active, $mrc, 1);
272: } while ($active);
273: }
274: }
275: }
276:
277: 278: 279:
280: private function processMessages()
281: {
282:
283: while ($done = curl_multi_info_read($this->multiHandle)) {
284: try {
285: $request = $this->resourceHash[(int) $done['handle']];
286: $this->processResponse($request, $this->handles[$request], $done);
287: $this->successful[] = $request;
288: } catch (MultiTransferException $e) {
289: $this->removeErroredRequest($request, $e, false);
290: throw $e;
291: } catch (\Exception $e) {
292: $this->removeErroredRequest($request, $e);
293: }
294: }
295: }
296:
297: 298: 299: 300: 301: 302: 303:
304: private function executeHandles(&$active, &$mrc, $timeout = 1)
305: {
306: do {
307: $mrc = curl_multi_exec($this->multiHandle, $active);
308: } while ($mrc == CURLM_CALL_MULTI_PERFORM && $active);
309: $this->checkCurlResult($mrc);
310:
311:
312:
313:
314: if ($active && $mrc == CURLM_OK && curl_multi_select($this->multiHandle, $timeout) == -1) {
315:
316:
317: usleep(100);
318: }
319:
320: }
321:
322: 323: 324: 325: 326: 327: 328:
329: protected function removeErroredRequest(RequestInterface $request, \Exception $e = null, $buffer = true)
330: {
331: if ($buffer) {
332: $this->exceptions[] = array('request' => $request, 'exception' => $e);
333: }
334:
335: $this->remove($request);
336: $request->setState(RequestInterface::STATE_ERROR);
337: $this->dispatch(self::MULTI_EXCEPTION, array('exception' => $e, 'all_exceptions' => $this->exceptions));
338: }
339:
340: 341: 342: 343: 344: 345: 346: 347: 348:
349: protected function processResponse(RequestInterface $request, CurlHandle $handle, array $curl)
350: {
351:
352: $handle->updateRequestFromTransfer($request);
353:
354: $curlException = $this->isCurlException($request, $handle, $curl);
355:
356:
357:
358: $this->removeHandle($request);
359:
360: if (!$curlException) {
361: $request->setState(RequestInterface::STATE_COMPLETE, array('handle' => $handle));
362:
363: if ($request->getState() != RequestInterface::STATE_TRANSFER) {
364: $this->remove($request);
365: }
366: } else {
367:
368: $request->setState(RequestInterface::STATE_ERROR);
369:
370: $request->dispatch('request.exception', array(
371: 'request' => $this,
372: 'exception' => $curlException
373: ));
374:
375:
376: $state = $request->getState();
377: if ($state != RequestInterface::STATE_TRANSFER) {
378: $this->remove($request);
379: }
380:
381: if ($state == RequestInterface::STATE_ERROR) {
382:
383: throw $curlException;
384: }
385: }
386: }
387:
388: 389: 390: 391: 392:
393: protected function removeHandle(RequestInterface $request)
394: {
395: if (isset($this->handles[$request])) {
396: $handle = $this->handles[$request];
397: unset($this->handles[$request]);
398: unset($this->resourceHash[(int) $handle->getHandle()]);
399: curl_multi_remove_handle($this->multiHandle, $handle->getHandle());
400: $handle->close();
401: }
402: }
403:
404: 405: 406: 407: 408: 409: 410: 411: 412:
413: private function isCurlException(RequestInterface $request, CurlHandle $handle, array $curl)
414: {
415: if (CURLM_OK == $curl['result'] || CURLM_CALL_MULTI_PERFORM == $curl['result']) {
416: return false;
417: }
418:
419: $handle->setErrorNo($curl['result']);
420: $e = new CurlException(sprintf('[curl] %s: %s [url] %s',
421: $handle->getErrorNo(), $handle->getError(), $handle->getUrl()));
422: $e->setCurlHandle($handle)
423: ->setRequest($request)
424: ->setCurlInfo($handle->getInfo())
425: ->setError($handle->getError(), $handle->getErrorNo());
426:
427: return $e;
428: }
429:
430: 431: 432: 433: 434: 435:
436: private function checkCurlResult($code)
437: {
438: if ($code != CURLM_OK && $code != CURLM_CALL_MULTI_PERFORM) {
439: throw new CurlException(isset($this->multiErrors[$code])
440: ? "cURL error: {$code} ({$this->multiErrors[$code][0]}): cURL message: {$this->multiErrors[$code][1]}"
441: : 'Unexpected cURL error: ' . $code
442: );
443: }
444: }
445: }
446: