Overview

Namespaces

  • Contrib
    • Bundle
      • CoverallsBundle
        • Console
        • Entity
      • CoverallsV1Bundle
        • Api
        • Collector
        • Command
        • Config
        • Entity
          • Git
    • Component
      • File
      • Log
      • System
        • Git
  • Guzzle
    • Batch
      • Exception
    • Cache
    • Common
      • Exception
    • Http
      • Curl
      • Exception
      • Message
      • QueryAggregator
    • Inflection
    • Iterator
    • Log
    • Parser
      • Cookie
      • Message
      • UriTemplate
      • Url
    • Plugin
      • Async
      • Backoff
      • Cache
      • Cookie
        • CookieJar
        • Exception
      • CurlAuth
      • ErrorResponse
        • Exception
      • History
      • Log
      • Md5
      • Mock
      • Oauth
    • Service
      • Builder
      • Command
        • Factory
        • LocationVisitor
          • Request
          • Response
      • Description
      • Exception
      • Resource
    • Stream
  • PHP
  • Psr
    • Log
  • Symfony
    • Component
      • Config
        • Definition
          • Builder
          • Exception
        • Exception
        • Loader
        • Resource
        • Util
      • Console
        • Command
        • Formatter
        • Helper
        • Input
        • Output
        • Tester
      • EventDispatcher
        • Debug
      • Finder
        • Adapter
        • Comparator
        • Exception
        • Expression
        • Iterator
        • Shell
      • Stopwatch
      • Yaml
        • Exception

Classes

  • CurlHandle
  • CurlMulti
  • CurlMultiProxy
  • CurlVersion
  • RequestMediator

Interfaces

  • CurlMultiInterface
  • Overview
  • Namespace
  • Class
  • Tree
  • Todo
  1: <?php
  2: 
  3: namespace Guzzle\Http\Curl;
  4: 
  5: use Guzzle\Common\AbstractHasDispatcher;
  6: use Guzzle\Http\Exception\MultiTransferException;
  7: use Guzzle\Http\Exception\CurlException;
  8: use Guzzle\Http\Message\RequestInterface;
  9: 
 10: /**
 11:  * Send {@see RequestInterface} objects in parallel using curl_multi
 12:  */
 13: class CurlMulti extends AbstractHasDispatcher implements CurlMultiInterface
 14: {
 15:     /**
 16:      * @var resource cURL multi handle.
 17:      */
 18:     protected $multiHandle;
 19: 
 20:     /**
 21:      * @var array Attached {@see RequestInterface} objects.
 22:      */
 23:     protected $requests;
 24: 
 25:     /**
 26:      * @var \SplObjectStorage RequestInterface to CurlHandle hash
 27:      */
 28:     protected $handles;
 29: 
 30:     /**
 31:      * @var array Hash mapping curl handle resource IDs to request objects
 32:      */
 33:     protected $resourceHash;
 34: 
 35:     /**
 36:      * @var array Queued exceptions
 37:      */
 38:     protected $exceptions = array();
 39: 
 40:     /**
 41:      * @var array Requests that succeeded
 42:      */
 43:     protected $successful = array();
 44: 
 45:     /**
 46:      * @var array cURL multi error values and codes
 47:      */
 48:     protected $multiErrors = array(
 49:         CURLM_BAD_HANDLE      => array('CURLM_BAD_HANDLE', 'The passed-in handle is not a valid CURLM handle.'),
 50:         CURLM_BAD_EASY_HANDLE => array('CURLM_BAD_EASY_HANDLE', "An easy handle was not good/valid. It could mean that it isn't an easy handle at all, or possibly that the handle already is in used by this or another multi handle."),
 51:         CURLM_OUT_OF_MEMORY   => array('CURLM_OUT_OF_MEMORY', 'You are doomed.'),
 52:         CURLM_INTERNAL_ERROR  => array('CURLM_INTERNAL_ERROR', 'This can only be returned if libcurl bugs. Please report it to us!')
 53:     );
 54: 
 55:     /**
 56:      * {@inheritdoc}
 57:      */
 58:     public function __construct()
 59:     {
 60:         $this->multiHandle = curl_multi_init();
 61:         // @codeCoverageIgnoreStart
 62:         if ($this->multiHandle === false) {
 63:             throw new CurlException('Unable to create multi handle');
 64:         }
 65:         // @codeCoverageIgnoreEnd
 66:         $this->reset();
 67:     }
 68: 
 69:     /**
 70:      * {@inheritdoc}
 71:      */
 72:     public function __destruct()
 73:     {
 74:         if (is_resource($this->multiHandle)) {
 75:             curl_multi_close($this->multiHandle);
 76:         }
 77:     }
 78: 
 79:     /**
 80:      * {@inheritdoc}
 81:      */
 82:     public function add(RequestInterface $request)
 83:     {
 84:         $this->requests[] = $request;
 85:         // If requests are currently transferring and this is async, then the
 86:         // request must be prepared now as the send() method is not called.
 87:         $this->beforeSend($request);
 88:         $this->dispatch(self::ADD_REQUEST, array('request' => $request));
 89: 
 90:         return $this;
 91:     }
 92: 
 93:     /**
 94:      * {@inheritdoc}
 95:      */
 96:     public function all()
 97:     {
 98:         return $this->requests;
 99:     }
100: 
101:     /**
102:      * {@inheritdoc}
103:      */
104:     public function remove(RequestInterface $request)
105:     {
106:         $this->removeHandle($request);
107:         foreach ($this->requests as $i => $r) {
108:             if ($request === $r) {
109:                 unset($this->requests[$i]);
110:                 $this->requests = array_values($this->requests);
111:                 $this->dispatch(self::REMOVE_REQUEST, array('request' => $request));
112:                 return true;
113:             }
114:         }
115: 
116:         return false;
117:     }
118: 
119:     /**
120:      * {@inheritdoc}
121:      */
122:     public function reset($hard = false)
123:     {
124:         // Remove each request
125:         if ($this->requests) {
126:             foreach ($this->requests as $request) {
127:                 $this->remove($request);
128:             }
129:         }
130: 
131:         $this->handles = new \SplObjectStorage();
132:         $this->requests = $this->resourceHash = $this->exceptions = $this->successful = array();
133:     }
134: 
135:     /**
136:      * {@inheritdoc}
137:      */
138:     public function send()
139:     {
140:         $this->perform();
141:         $exceptions = $this->exceptions;
142:         $successful = $this->successful;
143:         $this->reset();
144: 
145:         if ($exceptions) {
146:             $this->throwMultiException($exceptions, $successful);
147:         }
148:     }
149: 
150:     /**
151:      * {@inheritdoc}
152:      */
153:     public function count()
154:     {
155:         return count($this->requests);
156:     }
157: 
158:     /**
159:      * Build and throw a MultiTransferException
160:      *
161:      * @param array $exceptions Exceptions encountered
162:      * @param array $successful Successful requests
163:      * @throws MultiTransferException
164:      */
165:     protected function throwMultiException(array $exceptions, array $successful)
166:     {
167:         $multiException = new MultiTransferException('Errors during multi transfer');
168: 
169:         while ($e = array_shift($exceptions)) {
170:             $multiException->add($e['exception']);
171:             $multiException->addFailedRequest($e['request']);
172:         }
173: 
174:         // Add successful requests
175:         foreach ($successful as $request) {
176:             if (!$multiException->containsRequest($request)) {
177:                 $multiException->addSuccessfulRequest($request);
178:             }
179:         }
180: 
181:         throw $multiException;
182:     }
183: 
184:     /**
185:      * Prepare for sending
186:      *
187:      * @param RequestInterface $request Request to prepare
188:      * @throws \Exception on error preparing the request
189:      */
190:     protected function beforeSend(RequestInterface $request)
191:     {
192:         try {
193:             // Fix Content-Length and Transfer-Encoding collisions
194:             if ($request->hasHeader('Transfer-Encoding') && $request->hasHeader('Content-Length')) {
195:                 $request->removeHeader('Transfer-Encoding');
196:             }
197:             $request->setState(RequestInterface::STATE_TRANSFER);
198:             $request->dispatch('request.before_send', array('request' => $request));
199:             if ($request->getState() != RequestInterface::STATE_TRANSFER) {
200:                 // Requests might decide they don't need to be sent just before transfer (e.g. CachePlugin)
201:                 $this->remove($request);
202:                 if ($request->getState() == RequestInterface::STATE_COMPLETE) {
203:                     $this->successful[] = $request;
204:                 }
205:             } else {
206:                 // Add the request curl handle to the multi handle
207:                 $this->checkCurlResult(curl_multi_add_handle($this->multiHandle, $this->createCurlHandle($request)->getHandle()));
208:             }
209:         } catch (\Exception $e) {
210:             // Queue the exception to be thrown when sent
211:             $this->removeErroredRequest($request, $e);
212:         }
213:     }
214: 
215:     /**
216:      * Create a curl handle for a request
217:      *
218:      * @param RequestInterface $request Request
219:      *
220:      * @return CurlHandle
221:      */
222:     protected function createCurlHandle(RequestInterface $request)
223:     {
224:         $wrapper = CurlHandle::factory($request);
225:         $this->handles[$request] = $wrapper;
226:         $this->resourceHash[(int) $wrapper->getHandle()] = $request;
227: 
228:         return $wrapper;
229:     }
230: 
231:     /**
232:      * Get the data from the multi handle
233:      */
234:     protected function perform()
235:     {
236:         if (!$this->requests) {
237:             return;
238:         }
239: 
240:         // Initialize the handles with a very quick select timeout
241:         $active = $mrc = null;
242:         $this->executeHandles($active, $mrc, 0.001);
243:         $event = array('curl_multi' => $this);
244: 
245:         while (1) {
246: 
247:             $this->processMessages();
248: 
249:             // Exit the function if there are no more requests to send
250:             if (!$this->requests) {
251:                 break;
252:             }
253: 
254:             // Notify each request as polling
255:             $blocking = $total = 0;
256:             foreach ($this->requests as $request) {
257:                 $event['request'] = $request;
258:                 $request->dispatch(self::POLLING_REQUEST, $event);
259:                 ++$total;
260:                 // The blocking variable just has to be non-falsey to block the loop
261:                 if ($request->getParams()->hasKey(self::BLOCKING)) {
262:                     ++$blocking;
263:                 }
264:             }
265: 
266:             if ($blocking == $total) {
267:                 // Sleep to prevent eating CPU because no requests are actually pending a select call
268:                 usleep(500);
269:             } else {
270:                 do {
271:                     $this->executeHandles($active, $mrc, 1);
272:                 } while ($active);
273:             }
274:         }
275:     }
276: 
277:     /**
278:      * Process any received curl multi messages
279:      */
280:     private function processMessages()
281:     {
282:         // Get messages from curl handles
283:         while ($done = curl_multi_info_read($this->multiHandle)) {
284:             try {
285:                 $request = $this->resourceHash[(int) $done['handle']];
286:                 $this->processResponse($request, $this->handles[$request], $done);
287:                 $this->successful[] = $request;
288:             } catch (MultiTransferException $e) {
289:                 $this->removeErroredRequest($request, $e, false);
290:                 throw $e;
291:             } catch (\Exception $e) {
292:                 $this->removeErroredRequest($request, $e);
293:             }
294:         }
295:     }
296: 
297:     /**
298:      * Execute and select curl handles until there is activity
299:      *
300:      * @param int $active  Active value to update
301:      * @param int $mrc     Multi result value to update
302:      * @param int $timeout Select timeout in seconds
303:      */
304:     private function executeHandles(&$active, &$mrc, $timeout = 1)
305:     {
306:         do {
307:             $mrc = curl_multi_exec($this->multiHandle, $active);
308:         } while ($mrc == CURLM_CALL_MULTI_PERFORM && $active);
309:         $this->checkCurlResult($mrc);
310: 
311:         // @codeCoverageIgnoreStart
312:         // Select the curl handles until there is any activity on any of the open file descriptors
313:         // See https://github.com/php/php-src/blob/master/ext/curl/multi.c#L170
314:         if ($active && $mrc == CURLM_OK && curl_multi_select($this->multiHandle, $timeout) == -1) {
315:             // Perform a usleep if a previously executed select returned -1
316:             // @see https://bugs.php.net/bug.php?id=61141
317:             usleep(100);
318:         }
319:         // @codeCoverageIgnoreEnd
320:     }
321: 
322:     /**
323:      * Remove a request that encountered an exception
324:      *
325:      * @param RequestInterface $request Request to remove
326:      * @param \Exception       $e       Exception encountered
327:      * @param bool             $buffer  Set to false to not buffer the exception
328:      */
329:     protected function removeErroredRequest(RequestInterface $request, \Exception $e = null, $buffer = true)
330:     {
331:         if ($buffer) {
332:             $this->exceptions[] = array('request' => $request, 'exception' => $e);
333:         }
334: 
335:         $this->remove($request);
336:         $request->setState(RequestInterface::STATE_ERROR);
337:         $this->dispatch(self::MULTI_EXCEPTION, array('exception' => $e, 'all_exceptions' => $this->exceptions));
338:     }
339: 
340:     /**
341:      * Check for errors and fix headers of a request based on a curl response
342:      *
343:      * @param RequestInterface $request Request to process
344:      * @param CurlHandle       $handle  Curl handle object
345:      * @param array            $curl    Array returned from curl_multi_info_read
346:      *
347:      * @throws CurlException on Curl error
348:      */
349:     protected function processResponse(RequestInterface $request, CurlHandle $handle, array $curl)
350:     {
351:         // Set the transfer stats on the response
352:         $handle->updateRequestFromTransfer($request);
353:         // Check if a cURL exception occurred, and if so, notify things
354:         $curlException = $this->isCurlException($request, $handle, $curl);
355: 
356:         // Always remove completed curl handles.  They can be added back again
357:         // via events if needed (e.g. ExponentialBackoffPlugin)
358:         $this->removeHandle($request);
359: 
360:         if (!$curlException) {
361:             $request->setState(RequestInterface::STATE_COMPLETE, array('handle' => $handle));
362:             // Only remove the request if it wasn't resent as a result of the state change
363:             if ($request->getState() != RequestInterface::STATE_TRANSFER) {
364:                 $this->remove($request);
365:             }
366:         } else {
367:             // Set the state of the request to an error
368:             $request->setState(RequestInterface::STATE_ERROR);
369:             // Notify things that listen to the request of the failure
370:             $request->dispatch('request.exception', array(
371:                 'request'   => $this,
372:                 'exception' => $curlException
373:             ));
374: 
375:             // Allow things to ignore the error if possible
376:             $state = $request->getState();
377:             if ($state != RequestInterface::STATE_TRANSFER) {
378:                 $this->remove($request);
379:             }
380:             // The error was not handled, so fail
381:             if ($state == RequestInterface::STATE_ERROR) {
382:                 /** @var CurlException $curlException */
383:                 throw $curlException;
384:             }
385:         }
386:     }
387: 
388:     /**
389:      * Remove a curl handle from the curl multi object
390:      *
391:      * @param RequestInterface $request Request that owns the handle
392:      */
393:     protected function removeHandle(RequestInterface $request)
394:     {
395:         if (isset($this->handles[$request])) {
396:             $handle = $this->handles[$request];
397:             unset($this->handles[$request]);
398:             unset($this->resourceHash[(int) $handle->getHandle()]);
399:             curl_multi_remove_handle($this->multiHandle, $handle->getHandle());
400:             $handle->close();
401:         }
402:     }
403: 
404:     /**
405:      * Check if a cURL transfer resulted in what should be an exception
406:      *
407:      * @param RequestInterface $request Request to check
408:      * @param CurlHandle       $handle  Curl handle object
409:      * @param array            $curl    Array returned from curl_multi_info_read
410:      *
411:      * @return CurlException|bool
412:      */
413:     private function isCurlException(RequestInterface $request, CurlHandle $handle, array $curl)
414:     {
415:         if (CURLM_OK == $curl['result'] || CURLM_CALL_MULTI_PERFORM == $curl['result']) {
416:             return false;
417:         }
418: 
419:         $handle->setErrorNo($curl['result']);
420:         $e = new CurlException(sprintf('[curl] %s: %s [url] %s',
421:             $handle->getErrorNo(), $handle->getError(), $handle->getUrl()));
422:         $e->setCurlHandle($handle)
423:             ->setRequest($request)
424:             ->setCurlInfo($handle->getInfo())
425:             ->setError($handle->getError(), $handle->getErrorNo());
426: 
427:         return $e;
428:     }
429: 
430:     /**
431:      * Throw an exception for a cURL multi response if needed
432:      *
433:      * @param int $code Curl response code
434:      * @throws CurlException
435:      */
436:     private function checkCurlResult($code)
437:     {
438:         if ($code != CURLM_OK && $code != CURLM_CALL_MULTI_PERFORM) {
439:             throw new CurlException(isset($this->multiErrors[$code])
440:                 ? "cURL error: {$code} ({$this->multiErrors[$code][0]}): cURL message: {$this->multiErrors[$code][1]}"
441:                 : 'Unexpected cURL error: ' . $code
442:             );
443:         }
444:     }
445: }
446: 
php-coveralls API documentation generated by ApiGen 2.8.0