You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
IYUUAutoReseed/vendor/owner888/phpspider/core/phpspider.bak20170807.php

2870 lines
87 KiB

<?php
// +----------------------------------------------------------------------
// | PHPSpider [ A PHP Framework For Crawler ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2014 https://doc.phpspider.org All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: Seatle Yang <seatle@foxmail.com>
// +----------------------------------------------------------------------
//----------------------------------
// PHPSpider核心类文件
//----------------------------------
namespace phpspider\core;
require_once __DIR__ . '/constants.php';
use phpspider\core\requests;
use phpspider\core\selector;
use phpspider\core\queue;
use phpspider\core\db;
use phpspider\core\util;
use phpspider\core\log;
use Exception;
//require CORE.'/log.php';
//require CORE.'/requests.php';
//require CORE.'/selector.php';
//require CORE.'/util.php';
//require CORE.'/db.php';
//require CORE.'/cache.php';
//require CORE."/worker.php";
//require CORE."/phpspider.php";
// 启动的时候生成data目录
util::path_exists(PATH_DATA);
util::path_exists(PATH_DATA."/lock");
util::path_exists(PATH_DATA."/log");
util::path_exists(PATH_DATA."/cache");
util::path_exists(PATH_DATA."/status");
class phpspider
{
/**
* 版本号
* @var string
*/
const VERSION = '3.0.4';
/**
* 爬虫爬取每个网页的时间间隔,0表示不延时, 单位: 毫秒
*/
const INTERVAL = 0;
/**
* 爬虫爬取每个网页的超时时间, 单位: 秒
*/
const TIMEOUT = 5;
/**
* 爬取失败次数, 不想失败重新爬取则设置为0
*/
const MAX_TRY = 0;
/**
* 爬虫爬取网页所使用的浏览器类型: pc、ios、android
* 默认类型是PC
*/
const AGENT_PC = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36";
const AGENT_IOS = "Mozilla/5.0 (iPhone; CPU iPhone OS 9_3_3 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13G34 Safari/601.1";
const AGENT_ANDROID = "Mozilla/5.0 (Linux; U; Android 6.0.1;zh_cn; Le X820 Build/FEXCNFN5801507014S) AppleWebKit/537.36 (KHTML, like Gecko)Version/4.0 Chrome/49.0.0.0 Mobile Safari/537.36 EUI Browser/5.8.015S";
/**
* pid文件的路径及名称
* @var string
*/
//public static $pid_file = '';
/**
* 日志目录, 默认在data根目录下
* @var mixed
*/
//public static $log_file = '';
/**
* 主任务进程ID
*/
//public static $master_pid = 0;
/**
* 所有任务进程ID
*/
//public static $taskpids = array();
/**
* Daemonize.
*
* @var bool
*/
public static $daemonize = false;
/**
* 当前进程是否终止
*/
public static $terminate = false;
/**
* 是否分布式
*/
public static $multiserver = false;
/**
* 当前服务器ID
*/
public static $serverid = 1;
/**
* 主任务进程
*/
public static $taskmaster = true;
/**
* 当前任务ID
*/
public static $taskid = 1;
/**
* 当前任务进程ID
*/
public static $taskpid = 1;
/**
* 并发任务数
*/
public static $tasknum = 1;
/**
* 生成
*/
public static $fork_task_complete = false;
/**
* 是否使用Redis
*/
public static $use_redis = false;
/**
* 是否保存爬虫运行状态
*/
public static $save_running_state = false;
/**
* 配置
*/
public static $configs = array();
/**
* 要抓取的URL队列
md5(url) => array(
'url' => '', // 要爬取的URL
'url_type' => '', // 要爬取的URL类型,scan_page、list_page、content_page
'method' => 'get', // 默认为"GET"请求, 也支持"POST"请求
'headers' => array(), // 此url的Headers, 可以为空
'params' => array(), // 发送请求时需添加的参数, 可以为空
'context_data'=> '', // 此url附加的数据, 可以为空
'proxies' => false, // 是否使用代理
'try_num' => 0 // 抓取次数
'max_try' => 0 // 允许抓取失败次数
)
*/
public static $collect_queue = array();
/**
* 要抓取的URL数组
* md5($url) => time()
*/
public static $collect_urls = array();
/**
* 要抓取的URL数量
*/
public static $collect_urls_num = 0;
/**
* 已经抓取的URL数量
*/
public static $collected_urls_num = 0;
/**
* 当前进程采集成功数
*/
public static $collect_succ = 0;
/**
* 当前进程采集失败数
*/
public static $collect_fail = 0;
/**
* 提取到的字段数
*/
public static $fields_num = 0;
/**
* 采集深度
*/
public static $depth_num = 0;
/**
* 爬虫开始时间
*/
public static $time_start = 0;
/**
* 任务状态
*/
public static $task_status = array();
// 导出类型配置
public static $export_type = '';
public static $export_file = '';
public static $export_conf = '';
public static $export_table = '';
// 数据库配置
public static $db_config = array();
// 队列配置
public static $queue_config = array();
// 运行面板参数长度
public static $server_length = 10;
public static $tasknum_length = 8;
public static $taskid_length = 8;
public static $pid_length = 8;
public static $mem_length = 8;
public static $urls_length = 15;
public static $speed_length = 6;
/**
* 爬虫初始化时调用, 用来指定一些爬取前的操作
*
* @var mixed
* @access public
*/
public $on_start = null;
/**
* 网页状态码回调
*
* @var mixed
* @access public
*/
public $on_status_code = null;
/**
* 判断当前网页是否被反爬虫, 需要开发者实现
*
* @var mixed
* @access public
*/
public $is_anti_spider = null;
/**
* 在一个网页下载完成之后调用, 主要用来对下载的网页进行处理
*
* @var mixed
* @access public
*/
public $on_download_page = null;
/**
* 在一个attached_url对应的网页下载完成之后调用. 主要用来对下载的网页进行处理
*
* @var mixed
* @access public
*/
public $on_download_attached_page = null;
/**
* 当前页面抽取到URL
*
* @var mixed
* @access public
*/
public $on_fetch_url = null;
/**
* URL属于入口页
* 在爬取到入口url的内容之后, 添加新的url到待爬队列之前调用
* 主要用来发现新的待爬url, 并且能给新发现的url附加数据
*
* @var mixed
* @access public
*/
public $on_scan_page = null;
/**
* URL属于列表页
* 在爬取到列表页url的内容之后, 添加新的url到待爬队列之前调用
* 主要用来发现新的待爬url, 并且能给新发现的url附加数据
*
* @var mixed
* @access public
*/
public $on_list_page = null;
/**
* URL属于内容页
* 在爬取到内容页url的内容之后, 添加新的url到待爬队列之前调用
* 主要用来发现新的待爬url, 并且能给新发现的url附加数据
*
* @var mixed
* @access public
*/
public $on_content_page = null;
/**
* 在抽取到field内容之后调用, 对其中包含的img标签进行回调处理
*
* @var mixed
* @access public
*/
public $on_handle_img = null;
/**
* 当一个field的内容被抽取到后进行的回调, 在此回调中可以对网页中抽取的内容作进一步处理
*
* @var mixed
* @access public
*/
public $on_extract_field = null;
/**
* 在一个网页的所有field抽取完成之后, 可能需要对field进一步处理, 以发布到自己的网站
*
* @var mixed
* @access public
*/
public $on_extract_page = null;
/**
* 如果抓取的页面是一个附件文件, 比如图片、视频、二进制文件、apk、ipad、exe
* 就不去分析他的内容提取field了, 提取field只针对HTML
*
* @var mixed
* @access public
*/
public $on_attachment_file = null;
function __construct($configs = array())
{
// 产生时钟云,解决php7下面ctrl+c无法停止bug
declare(ticks = 1);
// 先打开以显示验证报错内容
log::$log_show = true;
log::$log_file = isset($configs['log_file']) ? $configs['log_file'] : PATH_DATA.'/phpspider.log';
log::$log_type = isset($configs['log_type']) ? $configs['log_type'] : false;
// 彩蛋
$included_files = get_included_files();
$content = file_get_contents($included_files[0]);
if (!preg_match("#/\* Do NOT delete this comment \*/#", $content) || !preg_match("#/\* 不要删除这段注释 \*/#", $content))
{
$msg = "Unknown error...";
log::error($msg);
exit;
}
$configs['name'] = isset($configs['name']) ? $configs['name'] : 'phpspider';
$configs['proxies'] = isset($configs['proxies']) ? $configs['proxies'] : '';
$configs['user_agent'] = isset($configs['user_agent']) ? $configs['user_agent'] : self::AGENT_PC;
$configs['user_agents'] = isset($configs['user_agents']) ? $configs['user_agents'] : null;
$configs['client_ip'] = isset($configs['client_ip']) ? $configs['client_ip'] : null;
$configs['client_ips'] = isset($configs['client_ips']) ? $configs['client_ips'] : null;
$configs['interval'] = isset($configs['interval']) ? $configs['interval'] : self::INTERVAL;
$configs['timeout'] = isset($configs['timeout']) ? $configs['timeout'] : self::TIMEOUT;
$configs['max_try'] = isset($configs['max_try']) ? $configs['max_try'] : self::MAX_TRY;
$configs['max_depth'] = isset($configs['max_depth']) ? $configs['max_depth'] : 0;
$configs['max_fields'] = isset($configs['max_fields']) ? $configs['max_fields'] : 0;
$configs['export'] = isset($configs['export']) ? $configs['export'] : array();
// csv、sql、db
self::$export_type = isset($configs['export']['type']) ? $configs['export']['type'] : '';
self::$export_file = isset($configs['export']['file']) ? $configs['export']['file'] : '';
self::$export_table = isset($configs['export']['table']) ? $configs['export']['table'] : '';
self::$db_config = isset($configs['db_config']) ? $configs['db_config'] : array();
self::$queue_config = isset($configs['queue_config']) ? $configs['queue_config'] : array();
// 是否设置了并发任务数, 并且大于1, 而且不是windows环境
if (isset($configs['tasknum']) && $configs['tasknum'] > 1 && !util::is_win())
{
self::$tasknum = $configs['tasknum'];
}
// 是否设置了保留运行状态
if (isset($configs['save_running_state']))
{
self::$save_running_state = $configs['save_running_state'];
}
// 是否分布式
if (isset($configs['multiserver']))
{
self::$multiserver = $configs['multiserver'];
}
// 当前服务器ID
if (isset($configs['serverid']))
{
self::$serverid = $configs['serverid'];
}
// 不同项目的采集以采集名称作为前缀区分
if (isset($GLOBALS['config']['redis']['prefix']))
{
$GLOBALS['config']['redis']['prefix'] = $GLOBALS['config']['redis']['prefix'].'-'.md5($configs['name']);
}
self::$configs = $configs;
}
public function add_scan_url($url, $options = array(), $allowed_repeat = true)
{
// 投递状态
$status = false;
$link = $options;
$link['url'] = $url;
$link['url_type'] = 'scan_page';
$link = $this->link_uncompress($link);
if ($this->is_list_page($url))
{
$link['url_type'] = 'list_page';
$status = $this->queue_lpush($link, $allowed_repeat);
}
elseif ($this->is_content_page($url))
{
$link['url_type'] = 'content_page';
$status = $this->queue_lpush($link, $allowed_repeat);
}
else
{
$status = $this->queue_lpush($link, $allowed_repeat);
}
if ($status)
{
if ($link['url_type'] == 'scan_page')
{
log::debug("Find scan page: {$url}");
}
elseif ($link['url_type'] == 'list_page')
{
log::debug("Find list page: {$url}");
}
elseif ($link['url_type'] == 'content_page')
{
log::debug("Find content page: {$url}");
}
}
return $status;
}
/**
* 一般在 on_scan_page 和 on_list_page 回调函数中调用, 用来往待爬队列中添加url
* 两个进程同时调用这个方法, 传递相同url的时候, 就会出现url重复进入队列
*
* @param mixed $url
* @param mixed $options
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-18 10:17
*/
public function add_url($url, $options = array(), $depth = 0)
{
// 投递状态
$status = false;
$link = $options;
$link['url'] = $url;
$link['depth'] = $depth;
$link = $this->link_uncompress($link);
if ($this->is_list_page($url))
{
$link['url_type'] = 'list_page';
$status = $this->queue_lpush($link);
}
if ($this->is_content_page($url))
{
$link['url_type'] = 'content_page';
$status = $this->queue_lpush($link);
}
if ($status)
{
if ($link['url_type'] == 'scan_page')
{
log::debug("Find scan page: {$url}");
}
elseif ($link['url_type'] == 'list_page')
{
log::debug("Find list page: {$url}");
}
elseif ($link['url_type'] == 'content_page')
{
log::debug("Find content page: {$url}");
}
}
return $status;
}
/**
* 是否入口页面
*
* @param mixed $url
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-10-12 19:06
*/
public function is_scan_page($url)
{
$parse_url = parse_url($url);
if (empty($parse_url['host']) || !in_array($parse_url['host'], self::$configs['domains']))
{
return false;
}
return true;
}
/**
* 是否列表页面
*
* @param mixed $url
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-10-12 19:06
*/
public function is_list_page($url)
{
$result = false;
if (!empty(self::$configs['list_url_regexes']))
{
foreach (self::$configs['list_url_regexes'] as $regex)
{
if (preg_match("#{$regex}#i", $url))
{
$result = true;
break;
}
}
}
return $result;
}
/**
* 是否内容页面
*
* @param mixed $url
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-10-12 19:06
*/
public function is_content_page($url)
{
$result = false;
if (!empty(self::$configs['content_url_regexes']))
{
foreach (self::$configs['content_url_regexes'] as $regex)
{
if (preg_match("#{$regex}#i", $url))
{
$result = true;
break;
}
}
}
return $result;
}
/**
* Parse command.
* php yourfile.php start | stop | status | kill
*
* @return void
*/
public function parse_command()
{
// 检查运行命令的参数
global $argv;
$start_file = $argv[0];
// 命令
$command = isset($argv[1]) ? trim($argv[1]) : 'start';
// 子命令, 目前只支持-d
$command2 = isset($argv[2]) ? $argv[2] : '';
// 根据命令做相应处理
switch($command)
{
// 启动 phpspider
case 'start':
if ($command2 === '-d')
{
self::$daemonize = true;
}
break;
case 'stop':
exec("ps aux | grep $start_file | grep -v grep | awk '{print $2}'", $info);
if (count($info) <= 1)
{
echo "PHPSpider[$start_file] not run\n";
}
else
{
//echo "PHPSpider[$start_file] is stoping ...\n";
echo "PHPSpider[$start_file] stop success";
exec("ps aux | grep $start_file | grep -v grep | awk '{print $2}' |xargs kill -SIGINT", $info);
}
exit;
break;
case 'kill':
exec("ps aux | grep $start_file | grep -v grep | awk '{print $2}' |xargs kill -SIGKILL");
break;
// 显示 phpspider 运行状态
case 'status':
exit(0);
// 未知命令
default :
exit("Usage: php yourfile.php {start|stop|status|kill}\n");
}
}
/**
* Signal hander.
*
* @param int $signal
*/
public function signal_handler($signal)
{
switch ($signal) {
// Stop.
case SIGINT:
log::warn("Program stopping...");
self::$terminate = true;
break;
// Show status.
case SIGUSR2:
echo "show status\n";
break;
}
}
/**
* Install signal handler.
*
* @return void
*/
public function install_signal()
{
if (function_exists('pcntl_signal'))
{
// stop
pcntl_signal(SIGINT, array(__CLASS__, 'signal_handler'), false);
// status
pcntl_signal(SIGUSR2, array(__CLASS__, 'signal_handler'), false);
// ignore
pcntl_signal(SIGPIPE, SIG_IGN, false);
}
}
/**
* Run as deamon mode.
*
* @throws Exception
*/
protected static function daemonize()
{
if (!self::$daemonize)
{
return;
}
// fork前一定要关闭redis
queue::clear_link();
umask(0);
$pid = pcntl_fork();
if (-1 === $pid)
{
throw new Exception('fork fail');
}
elseif ($pid > 0)
{
exit(0);
}
if (-1 === posix_setsid())
{
throw new Exception("setsid fail");
}
// Fork again avoid SVR4 system regain the control of terminal.
$pid = pcntl_fork();
if (-1 === $pid)
{
throw new Exception("fork fail");
}
elseif (0 !== $pid)
{
exit(0);
}
}
/**
* 检查是否终止当前进程
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-11-16 11:06
*/
public function check_terminate()
{
if (!self::$terminate)
{
return false;
}
// 删除当前任务状态
$this->del_task_status(self::$serverid, self::$taskid);
if (self::$taskmaster)
{
// 检查子进程是否都退出
while (true)
{
$all_stop = true;
for ($i = 2; $i <= self::$tasknum; $i++)
{
// 只要一个还活着就说明没有完全退出
$task_status = $this->get_task_status(self::$serverid, $i);
if ($task_status)
{
$all_stop = false;
}
}
if ($all_stop)
{
break;
}
else
{
log::warn("Task stop waiting...");
}
sleep(1);
}
$this->del_server_list(self::$serverid);
// 显示最后结果
log::$log_show = true;
$spider_time_run = util::time2second(intval(microtime(true) - self::$time_start));
log::note("Spider finished in {$spider_time_run}");
$get_collected_url_num = $this->get_collected_url_num();
log::note("Total pages: {$get_collected_url_num} \n");
}
exit();
}
public function start()
{
$this->parse_command();
// 爬虫开始时间
self::$time_start = time();
// 当前任务ID
self::$taskid = 1;
// 当前任务进程ID
self::$taskpid = function_exists('posix_getpid') ? posix_getpid() : 1;
self::$collect_succ = 0;
self::$collect_fail = 0;
//--------------------------------------------------------------------------------
// 运行前验证
//--------------------------------------------------------------------------------
// 检查PHP版本
if (version_compare(PHP_VERSION, '5.3.0', 'lt'))
{
log::error('PHP 5.3+ is required, currently installed version is: ' . phpversion());
exit;
}
// 检查CURL扩展
if(!function_exists('curl_init'))
{
log::error("The curl extension was not found");
exit;
}
// 多任务需要pcntl扩展支持
if (self::$tasknum > 1 && !function_exists('pcntl_fork'))
{
log::error("Multitasking needs pcntl, the pcntl extension was not found");
exit;
}
// 守护进程需要pcntl扩展支持
if (self::$daemonize && !function_exists('pcntl_fork'))
{
log::error("Daemonize needs pcntl, the pcntl extension was not found");
exit;
}
// 集群、保存运行状态、多任务都需要Redis支持
if (self::$multiserver || self::$save_running_state || self::$tasknum > 1)
{
self::$use_redis = true;
queue::set_connect('default', self::$queue_config);
if (!queue::init())
{
if (self::$multiserver)
{
log::error("Multiserver needs Redis support, ".queue::$error);
exit;
}
if (self::$tasknum > 1)
{
log::error("Multitasking needs Redis support, ".queue::$error);
exit;
}
if (self::$save_running_state)
{
log::error("Spider kept running state needs Redis support, ".queue::$error);
exit;
}
}
}
// 检查导出
$this->check_export();
// 检查缓存
$this->check_cache();
// 检查 scan_urls
if (empty(self::$configs['scan_urls']))
{
log::error("No scan url to start");
exit;
}
foreach ( self::$configs['scan_urls'] as $url )
{
// 只检查配置中的入口URL, 通过 add_scan_url 添加的不检查了.
if (!$this->is_scan_page($url))
{
log::error("Domain of scan_urls (\"{$url}\") does not match the domains of the domain name");
exit;
}
}
// windows 下没法显示面板, 强制显示日志
if (util::is_win())
{
self::$configs['name'] = iconv("UTF-8", "GB2312//IGNORE", self::$configs['name']);
log::$log_show = true;
}
else
{
log::$log_show = isset(self::$configs['log_show']) ? self::$configs['log_show'] : false;
}
if (self::$daemonize)
{
log::$log_show = true;
}
if (log::$log_show)
{
global $argv;
$start_file = $argv[0];
$header = "";
if (!util::is_win()) $header .= "\033[33m";
$header .= "\n[ ".self::$configs['name']." Spider ] is started...\n\n";
$header .= " * PHPSpider Version: ".self::VERSION."\n";
$header .= " * Documentation: https://doc.phpspider.org\n";
$header .= " * Task Number: ".self::$tasknum."\n\n";
$header .= "Input \"php $start_file stop\" to quit. Start success.\n";
if (!util::is_win()) $header .= "\033[0m";
log::note($header);
}
// 如果是守护进程,恢复日志状态
if (self::$daemonize)
{
log::$log_show = isset(self::$configs['log_show']) ? self::$configs['log_show'] : false;
}
// 多任务和分布式都要清掉, 当然分布式只清自己的
$this->init_redis();
//--------------------------------------------------------------------------------
// 生成多任务
//--------------------------------------------------------------------------------
// 添加入口URL到队列
foreach ( self::$configs['scan_urls'] as $url )
{
// false 表示不允许重复
$this->add_scan_url($url, null, false);
}
// 放这个位置, 可以添加入口页面
if ($this->on_start)
{
call_user_func($this->on_start, $this);
}
if (!self::$daemonize)
{
if (!log::$log_show)
{
// 第一次先清屏
$this->clear_echo();
// 先显示一次面板, 然后下面再每次采集成功显示一次
$this->display_ui();
}
}
else
{
$this->daemonize();
}
// 安装信号
$this->install_signal();
// 开始采集
$this->do_collect_page();
// 从服务器列表中删除当前服务器信息
$this->del_server_list(self::$serverid);
}
/**
* 创建一个子进程
* @param Worker $worker
* @throws Exception
*/
public function fork_one_task($taskid)
{
$pid = pcntl_fork();
// 主进程记录子进程pid
if($pid > 0)
{
// 暂时没用
//self::$taskpids[$taskid] = $pid;
}
// 子进程运行
elseif(0 === $pid)
{
log::warn("Fork children task({$taskid}) successful...");
// 初始化子进程参数
self::$time_start = microtime(true);
self::$taskid = $taskid;
self::$taskmaster = false;
self::$taskpid = posix_getpid();
self::$collect_succ = 0;
self::$collect_fail = 0;
$this->do_collect_page();
// 这里用0表示正常退出
exit(0);
}
else
{
log::error("Fork children task({$taskid}) fail...");
exit;
}
}
public function do_collect_page()
{
queue::set_connect('default', self::$queue_config);
queue::init();
while( $queue_lsize = $this->queue_lsize() )
{
// 如果是主任务
if (self::$taskmaster)
{
// 多任务下主任务未准备就绪
if (self::$tasknum > 1 && !self::$fork_task_complete)
{
// 主进程采集到两倍于任务数时, 生成子任务一起采集
if ( $queue_lsize > self::$tasknum*2 )
{
self::$fork_task_complete = true;
// fork 子进程前一定要先干掉redis连接fd, 不然会存在进程互抢redis fd 问题
queue::clear_link();
// task进程从2开始, 1被master进程所使用
for ($i = 2; $i <= self::$tasknum; $i++)
{
$this->fork_one_task($i);
}
}
}
// 抓取页面
$this->collect_page();
// 保存任务状态
$this->set_task_status();
// 每采集成功一次页面, 就刷新一次面板
if (!log::$log_show && !self::$daemonize)
{
$this->display_ui();
}
}
// 如果是子任务
else
{
// 如果队列中的网页比任务数2倍多, 子任务可以采集, 否则等待...
if ( $queue_lsize > self::$tasknum*2 )
{
// 抓取页面
$this->collect_page();
// 保存任务状态
$this->set_task_status();
}
else
{
log::warn("Task(".self::$taskid.") waiting...");
sleep(1);
}
}
// 检查进程是否收到关闭信号
$this->check_terminate();
}
}
/**
* 爬取页面
*
* @param mixed $collect_url 要抓取的链接
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-18 10:17
*/
public function collect_page()
{
$get_collect_url_num = $this->get_collect_url_num();
log::info("Find pages: {$get_collect_url_num} ");
$queue_lsize = $this->queue_lsize();
log::info("Waiting for collect pages: {$queue_lsize} ");
$get_collected_url_num = $this->get_collected_url_num();
log::info("Collected pages: {$get_collected_url_num} ");
// 多任务的时候输出爬虫序号
if (self::$tasknum > 1)
{
log::info("Current task id: ".self::$taskid);
}
// 先进先出
$link = $this->queue_rpop();
$link = $this->link_uncompress($link);
$url = $link['url'];
// 标记为已爬取网页
$this->incr_collected_url_num($url);
// 爬取页面开始时间
$page_time_start = microtime(true);
requests::$input_encoding = null;
$html = $this->request_url($url, $link);
if (!$html)
{
return false;
}
// 当前正在爬取的网页页面的对象
$page = array(
'url' => $url,
'raw' => $html,
'request' => array(
'url' => $url,
'method' => $link['method'],
'headers' => $link['headers'],
'params' => $link['params'],
'context_data' => $link['context_data'],
'try_num' => $link['try_num'],
'max_try' => $link['max_try'],
'depth' => $link['depth'],
'taskid' => self::$taskid,
),
);
unset($html);
//--------------------------------------------------------------------------------
// 处理回调函数
//--------------------------------------------------------------------------------
// 判断当前网页是否被反爬虫了, 需要开发者实现
if ($this->is_anti_spider)
{
$is_anti_spider = call_user_func($this->is_anti_spider, $url, $page['raw'], $this);
// 如果在回调函数里面判断被反爬虫并且返回true
if ($is_anti_spider)
{
return false;
}
}
// 在一个网页下载完成之后调用. 主要用来对下载的网页进行处理.
// 比如下载了某个网页, 希望向网页的body中添加html标签
if ($this->on_download_page)
{
$return = call_user_func($this->on_download_page, $page, $this);
// 针对那些老是忘记return的人
if (isset($return)) $page = $return;
}
// 是否从当前页面分析提取URL
// 回调函数如果返回false表示不需要再从此网页中发现待爬url
$is_find_url = true;
if ($link['url_type'] == 'scan_page')
{
if ($this->on_scan_page)
{
$return = call_user_func($this->on_scan_page, $page, $page['raw'], $this);
if (isset($return)) $is_find_url = $return;
}
}
elseif ($link['url_type'] == 'list_page')
{
if ($this->on_list_page)
{
$return = call_user_func($this->on_list_page, $page, $page['raw'], $this);
if (isset($return)) $is_find_url = $return;
}
}
elseif ($link['url_type'] == 'content_page')
{
if ($this->on_content_page)
{
$return = call_user_func($this->on_content_page, $page, $page['raw'], $this);
if (isset($return)) $is_find_url = $return;
}
}
// on_scan_page、on_list_page、on_content_page 返回false表示不需要再从此网页中发现待爬url
if ($is_find_url)
{
// 如果深度没有超过最大深度, 获取下一级URL
if (self::$configs['max_depth'] == 0 || $link['depth'] < self::$configs['max_depth'])
{
// 分析提取HTML页面中的URL
$this->get_urls($page['raw'], $url, $link['depth'] + 1);
}
}
// 如果是内容页, 分析提取HTML页面中的字段
// 列表页也可以提取数据的, source_type: urlcontext, 未实现
if ($link['url_type'] == 'content_page')
{
$this->get_html_fields($page['raw'], $url, $page);
}
// 如果当前深度大于缓存的, 更新缓存
$this->incr_depth_num($link['depth']);
// 处理页面耗时时间
$time_run = round(microtime(true) - $page_time_start, 3);
log::debug("Success process page {$url} in {$time_run} s");
$spider_time_run = util::time2second(intval(microtime(true) - self::$time_start));
log::info("Spider running in {$spider_time_run}");
// 爬虫爬取每个网页的时间间隔, 单位: 毫秒
if (!isset(self::$configs['interval']))
{
// 默认睡眠100毫秒, 太快了会被认为是ddos
self::$configs['interval'] = 100;
}
usleep(self::$configs['interval'] * 1000);
}
/**
* 下载网页, 得到网页内容
*
* @param mixed $url
* @param mixed $link
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-18 10:17
*/
public function request_url($url, $link = array())
{
$time_start = microtime(true);
//$url = "http://www.qiushibaike.com/article/117568316";
// 设置了编码就不要让requests去判断了
if (isset(self::$configs['input_encoding']))
{
requests::$input_encoding = self::$configs['input_encoding'];
}
// 得到的编码如果不是utf-8的要转成utf-8, 因为xpath只支持utf-8
requests::$output_encoding = 'utf-8';
requests::set_timeout(self::$configs['timeout']);
requests::set_useragent(self::$configs['user_agent']);
if (self::$configs['user_agents'])
{
requests::set_useragents(self::$configs['user_agents']);
}
if (self::$configs['client_ip'])
{
requests::set_client_ip(self::$configs['client_ip']);
}
if (self::$configs['client_ips'])
{
requests::set_client_ips(self::$configs['client_ips']);
}
// 是否设置了代理
if (!empty($link['proxies']))
{
requests::set_proxies($link['proxies']);
// 自动切换IP
requests::set_header('Proxy-Switch-Ip', 'yes');
}
// 如何设置了 HTTP Headers
if (!empty($link['headers']))
{
foreach ($link['headers'] as $k=>$v)
{
requests::set_header($k, $v);
}
}
$method = empty($link['method']) ? 'get' : strtolower($link['method']);
$params = empty($link['params']) ? array() : $link['params'];
$html = requests::$method($url, $params);
// 此url附加的数据不为空, 比如内容页需要列表页一些数据, 拼接到后面去
if ($html && !empty($link['context_data']))
{
$html .= $link['context_data'];
}
$http_code = requests::$status_code;
if ($this->on_status_code)
{
$return = call_user_func($this->on_status_code, $http_code, $url, $html, $this);
if (isset($return))
{
$html = $return;
}
if (!$html)
{
return false;
}
}
if ($http_code != 200)
{
// 如果是301、302跳转, 抓取跳转后的网页内容
if ($http_code == 301 || $http_code == 302)
{
$info = requests::$info;
if (isset($info['redirect_url']))
{
$url = $info['redirect_url'];
requests::$input_encoding = null;
$html = $this->request_url($url, $link);
if ($html && !empty($link['context_data']))
{
$html .= $link['context_data'];
}
}
else
{
return false;
}
}
else
{
if ($http_code == 407)
{
// 扔到队列头部去, 继续采集
$this->queue_rpush($link);
log::error("Failed to download page {$url}");
self::$collect_fail++;
}
elseif (in_array($http_code, array('0','502','503','429')))
{
// 采集次数加一
$link['try_num']++;
// 抓取次数 小于 允许抓取失败次数
if ( $link['try_num'] <= $link['max_try'] )
{
// 扔到队列头部去, 继续采集
$this->queue_rpush($link);
}
log::error("Failed to download page {$url}, retry({$link['try_num']})");
}
else
{
log::error("Failed to download page {$url}");
self::$collect_fail++;
}
log::error("HTTP CODE: {$http_code}");
return false;
}
}
// 爬取页面耗时时间
$time_run = round(microtime(true) - $time_start, 3);
log::debug("Success download page {$url} in {$time_run} s");
self::$collect_succ++;
return $html;
}
/**
* 分析提取HTML页面中的URL
*
* @param mixed $html HTML内容
* @param mixed $collect_url 抓取的URL, 用来拼凑完整页面的URL
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-18 10:17
*/
public function get_urls($html, $collect_url, $depth = 0)
{
//--------------------------------------------------------------------------------
// 正则匹配出页面中的URL
//--------------------------------------------------------------------------------
$urls = selector::select($html, '//a/@href');
//preg_match_all("/<a.*href=[\"']{0,1}(.*)[\"']{0,1}[> \r\n\t]{1,}/isU", $html, $matchs);
//$urls = array();
//if (!empty($matchs[1]))
//{
//foreach ($matchs[1] as $url)
//{
//$urls[] = str_replace(array("\"", "'",'&amp;'), array("",'','&'), $url);
//}
//}
if (empty($urls))
{
return false;
}
// 如果页面上只有一个url,要把他转为数组,否则下面会报警告
if (!is_array($urls))
{
$urls = array($urls);
}
foreach ($urls as $key=>$url)
{
$urls[$key] = str_replace(array("\"", "'",'&amp;'), array("",'','&'), $url);
}
//--------------------------------------------------------------------------------
// 过滤和拼凑URL
//--------------------------------------------------------------------------------
// 去除重复的RUL
$urls = array_unique($urls);
foreach ($urls as $k=>$url)
{
$url = trim($url);
if (empty($url))
{
continue;
}
$val = $this->fill_url($url, $collect_url);
if ($val)
{
$urls[$k] = $val;
}
else
{
unset($urls[$k]);
}
}
if (empty($urls))
{
return false;
}
//--------------------------------------------------------------------------------
// 把抓取到的URL放入队列
//--------------------------------------------------------------------------------
foreach ($urls as $url)
{
if ($this->on_fetch_url)
{
$return = call_user_func($this->on_fetch_url, $url, $this);
$url = isset($return) ? $return : $url;
unset($return);
// 如果 on_fetch_url 返回 false,此URL不入队列
if (!$url)
{
continue;
}
}
// 把当前页当做找到的url的Referer页
$options = array(
'headers' => array(
'Referer' => $collect_url,
)
);
$this->add_url($url, $options, $depth);
}
}
/**
* 获得完整的连接地址
*
* @param mixed $url 要检查的URL
* @param mixed $collect_url 从那个URL页面得到上面的URL
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function fill_url($url, $collect_url)
{
$url = trim($url);
$collect_url = trim($collect_url);
// 排除JavaScript的连接
//if (strpos($url, "javascript:") !== false)
if( preg_match("@^(javascript:|#|'|\")@i", $url) || $url == '')
{
return false;
}
// 排除没有被解析成功的语言标签
if(substr($url, 0, 3) == '<%=')
{
return false;
}
$parse_url = @parse_url($collect_url);
if (empty($parse_url['scheme']) || empty($parse_url['host']))
{
return false;
}
// 过滤mailto、tel、sms、wechat、sinaweibo、weixin等协议
if (!in_array($parse_url['scheme'], array("http", "https")))
{
return false;
}
$scheme = $parse_url['scheme'];
$domain = $parse_url['host'];
$path = empty($parse_url['path']) ? '' : $parse_url['path'];
$base_url_path = $domain.$path;
$base_url_path = preg_replace("/\/([^\/]*)\.(.*)$/","/",$base_url_path);
$base_url_path = preg_replace("/\/$/",'',$base_url_path);
$i = $path_step = 0;
$dstr = $pstr = '';
$pos = strpos($url,'#');
if($pos > 0)
{
// 去掉#和后面的字符串
$url = substr($url, 0, $pos);
}
// 京东变态的都是 //www.jd.com/111.html
if(substr($url, 0, 2) == '//')
{
$url = str_replace("//", "", $url);
}
// /1234.html
elseif($url[0] == '/')
{
$url = $domain.$url;
}
// ./1234.html、../1234.html 这种类型的
elseif($url[0] == '.')
{
if(!isset($url[2]))
{
return false;
}
else
{
$urls = explode('/',$url);
foreach($urls as $u)
{
if( $u == '..' )
{
$path_step++;
}
// 遇到 ., 不知道为什么不直接写$u == '.', 貌似一样的
else if( $i < count($urls)-1 )
{
$dstr .= $urls[$i].'/';
}
else
{
$dstr .= $urls[$i];
}
$i++;
}
$urls = explode('/',$base_url_path);
if(count($urls) <= $path_step)
{
return false;
}
else
{
$pstr = '';
for($i=0;$i<count($urls)-$path_step;$i++){ $pstr .= $urls[$i].'/'; }
$url = $pstr.$dstr;
}
}
}
else
{
if( strtolower(substr($url, 0, 7))=='http://' )
{
$url = preg_replace('#^http://#i','',$url);
$scheme = "http";
}
else if( strtolower(substr($url, 0, 8))=='https://' )
{
$url = preg_replace('#^https://#i','',$url);
$scheme = "https";
}
else
{
$url = $base_url_path.'/'.$url;
}
}
// 两个 / 或以上的替换成一个 /
$url = preg_replace('@/{1,}@i', '/', $url);
$url = $scheme.'://'.$url;
//echo $url;exit("\n");
$parse_url = @parse_url($url);
$domain = empty($parse_url['host']) ? $domain : $parse_url['host'];
// 如果host不为空, 判断是不是要爬取的域名
if (!empty($parse_url['host']))
{
//排除非域名下的url以提高爬取速度
if (!in_array($parse_url['host'], self::$configs['domains']))
{
return false;
}
}
return $url;
}
/**
* 连接对象压缩
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-11-05 18:58
*/
public function link_compress($link)
{
if (empty($link['url_type']))
{
unset($link['url_type']);
}
if (empty($link['method']) || strtolower($link['method']) == 'get')
{
unset($link['method']);
}
if (empty($link['headers']))
{
unset($link['headers']);
}
if (empty($link['params']))
{
unset($link['params']);
}
if (empty($link['context_data']))
{
unset($link['context_data']);
}
if (empty($link['proxies']))
{
unset($link['proxies']);
}
if (empty($link['try_num']))
{
unset($link['try_num']);
}
if (empty($link['max_try']))
{
unset($link['max_try']);
}
if (empty($link['depth']))
{
unset($link['depth']);
}
//$json = json_encode($link);
//$json = gzdeflate($json);
return $link;
}
/**
* 连接对象解压缩
*
* @param mixed $link
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-11-05 18:58
*/
public function link_uncompress($link)
{
$link = array(
'url' => isset($link['url']) ? $link['url'] : '',
'url_type' => isset($link['url_type']) ? $link['url_type'] : '',
'method' => isset($link['method']) ? $link['method'] : 'get',
'headers' => isset($link['headers']) ? $link['headers'] : array(),
'params' => isset($link['params']) ? $link['params'] : array(),
'context_data' => isset($link['context_data']) ? $link['context_data'] : '',
'proxies' => isset($link['proxies']) ? $link['proxies'] : self::$configs['proxies'],
'try_num' => isset($link['try_num']) ? $link['try_num'] : 0,
'max_try' => isset($link['max_try']) ? $link['max_try'] : self::$configs['max_try'],
'depth' => isset($link['depth']) ? $link['depth'] : 0,
);
return $link;
}
/**
* 分析提取HTML页面中的字段
*
* @param mixed $html
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-18 10:17
*/
public function get_html_fields($html, $url, $page)
{
$fields = $this->get_fields(self::$configs['fields'], $html, $url, $page);
if (!empty($fields))
{
if ($this->on_extract_page)
{
$return = call_user_func($this->on_extract_page, $page, $fields);
if (!isset($return))
{
log::warn("on_extract_page return value can't be empty");
}
elseif (!is_array($return))
{
log::warn("on_extract_page return value must be an array");
}
else
{
$fields = $return;
}
}
if (isset($fields) && is_array($fields))
{
$fields_num = $this->incr_fields_num();
if (self::$configs['max_fields'] != 0 && $fields_num > self::$configs['max_fields'])
{
exit(0);
}
if (version_compare(PHP_VERSION,'5.4.0','<'))
{
$fields_str = json_encode($fields);
$fields_str = preg_replace_callback( "#\\\u([0-9a-f]{4})#i", function($matchs) {
return iconv('UCS-2BE', 'UTF-8', pack('H4', $matchs[1]));
}, $fields_str );
}
else
{
$fields_str = json_encode($fields, JSON_UNESCAPED_UNICODE);
}
if (util::is_win())
{
$fields_str = mb_convert_encoding($fields_str, 'gb2312', 'utf-8');
}
log::info("Result[{$fields_num}]: ".$fields_str);
// 如果设置了导出选项
if (!empty(self::$configs['export']))
{
self::$export_type = isset(self::$configs['export']['type']) ? self::$configs['export']['type'] : '';
if (self::$export_type == 'csv')
{
util::put_file(self::$export_file, util::format_csv($fields)."\n", FILE_APPEND);
}
elseif (self::$export_type == 'sql')
{
$sql = db::insert(self::$export_table, $fields, true);
util::put_file(self::$export_file, $sql.";\n", FILE_APPEND);
}
elseif (self::$export_type == 'db')
{
db::insert(self::$export_table, $fields);
}
}
}
}
}
/**
* 根据配置提取HTML代码块中的字段
*
* @param mixed $confs
* @param mixed $html
* @param mixed $page
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function get_fields($confs, $html, $url, $page)
{
$fields = array();
foreach ($confs as $conf)
{
// 当前field抽取到的内容是否是有多项
$repeated = isset($conf['repeated']) && $conf['repeated'] ? true : false;
// 当前field抽取到的内容是否必须有值
$required = isset($conf['required']) && $conf['required'] ? true : false;
if (empty($conf['name']))
{
log::error("The field name is null, please check your \"fields\" and add the name of the field\n");
exit;
}
$values = array();
// 如果定义抽取规则
if (!empty($conf['selector']))
{
// 如果这个field是上一个field的附带连接
if (isset($conf['source_type']) && $conf['source_type']=='attached_url')
{
// 取出上个field的内容作为连接, 内容分页是不进队列直接下载网页的
if (!empty($fields[$conf['attached_url']]))
{
$collect_url = $this->fill_url($fields[$conf['attached_url']], $url);
//log::debug("Find attached content page: {$collect_url}");
$link['url'] = $collect_url;
$link = $this->link_uncompress($link);
requests::$input_encoding = null;
$html = $this->request_url($collect_url, $link);
// 在一个attached_url对应的网页下载完成之后调用. 主要用来对下载的网页进行处理.
if ($this->on_download_attached_page)
{
$return = call_user_func($this->on_download_attached_page, $html, $this);
if (isset($return))
{
$html = $return;
}
}
// 请求获取完分页数据后把连接删除了
unset($fields[$conf['attached_url']]);
}
}
// 没有设置抽取规则的类型 或者 设置为 xpath
if (!isset($conf['selector_type']) || $conf['selector_type']=='xpath')
{
$values = $this->get_fields_xpath($html, $conf['selector'], $conf['name']);
}
elseif ($conf['selector_type']=='css')
{
$values = $this->get_fields_css($html, $conf['selector'], $conf['name']);
}
elseif ($conf['selector_type']=='regex')
{
$values = $this->get_fields_regex($html, $conf['selector'], $conf['name']);
}
// field不为空而且存在子配置
if (!empty($values) && !empty($conf['children']))
{
$child_values = array();
// 父项抽取到的html作为子项的提取内容
foreach ($values as $child_html)
{
// 递归调用本方法, 所以多少子项目都支持
$child_value = $this->get_fields($conf['children'], $child_html, $url, $page);
if (!empty($child_value))
{
$child_values[] = $child_value;
}
}
// 有子项就存子项的数组, 没有就存HTML代码块
if (!empty($child_values))
{
$values = $child_values;
}
}
}
if (empty($values))
{
// 如果值为空而且值设置为必须项, 跳出foreach循环
if ($required)
{
// 清空整个 fields
$fields = array();
break;
}
// 避免内容分页时attached_url拼接时候string + array了
$fields[$conf['name']] = '';
//$fields[$conf['name']] = array();
}
else
{
if (is_array($values))
{
if ($repeated)
{
$fields[$conf['name']] = $values;
}
else
{
$fields[$conf['name']] = $values[0];
}
}
else
{
$fields[$conf['name']] = $values;
}
// 不重复抽取则只取第一个元素
//$fields[$conf['name']] = $repeated ? $values : $values[0];
}
}
if (!empty($fields))
{
foreach ($fields as $fieldname => $data)
{
$pattern = "/<img.*src=[\"']{0,1}(.*)[\"']{0,1}[> \r\n\t]{1,}/isU";
/*$pattern = "/<img.*?src=[\'|\"](.*?(?:[\.gif|\.jpg|\.jpeg|\.png]))[\'|\"].*?[\/]?>/i"; */
// 在抽取到field内容之后调用, 对其中包含的img标签进行回调处理
if ($this->on_handle_img && preg_match($pattern, $data))
{
$return = call_user_func($this->on_handle_img, $fieldname, $data);
if (!isset($return))
{
log::warn("on_handle_img return value can't be empty\n");
}
else
{
// 有数据才会执行 on_handle_img 方法, 所以这里不要被替换没了
$data = $return;
}
}
// 当一个field的内容被抽取到后进行的回调, 在此回调中可以对网页中抽取的内容作进一步处理
if ($this->on_extract_field)
{
$return = call_user_func($this->on_extract_field, $fieldname, $data, $page);
if (!isset($return))
{
log::warn("on_extract_field return value can't be empty\n");
}
else
{
// 有数据才会执行 on_extract_field 方法, 所以这里不要被替换没了
$fields[$fieldname] = $return;
}
}
}
}
return $fields;
}
/**
* 验证导出
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-10-02 23:37
*/
public function check_export()
{
// 如果设置了导出选项
if (!empty(self::$configs['export']))
{
if (self::$export_type == 'csv')
{
if (empty(self::$export_file))
{
log::error("Export data into CSV files need to Set the file path.");
exit;
}
}
elseif (self::$export_type == 'sql')
{
if (empty(self::$export_file))
{
log::error("Export data into SQL files need to Set the file path.");
exit;
}
}
elseif (self::$export_type == 'db')
{
if (!function_exists('mysqli_connect'))
{
log::error("Export data to a database need Mysql support, Error: Unable to load mysqli extension.");
exit;
}
if (empty(self::$db_config))
{
log::error("Export data to a database need Mysql support, Error: You not set a config array for connect.");
exit;
}
$config = self::$db_config;
@mysqli_connect($config['host'], $config['user'], $config['pass'], $config['name'], $config['port']);
if(mysqli_connect_errno())
{
log::error("Export data to a database need Mysql support, Error: ".mysqli_connect_error());
exit;
}
db::set_connect('default', $config);
db::_init();
if (!db::table_exists(self::$export_table))
{
log::error("Table ".self::$export_table." does not exist");
exit;
}
}
}
}
public function check_cache()
{
if (!self::$use_redis || self::$save_running_state)
{
return false;
}
//if (queue::exists("collect_queue"))
$keys = queue::keys("*");
$count = count($keys);
if ($count != 0)
{
// After this operation, 4,318 kB of additional disk space will be used.
// Do you want to continue? [Y/n]
//$msg = "发现Redis中有采集数据, 是否继续执行, 不继续则清空Redis数据重新采集\n";
$msg = "Found that the data of Redis, no continue will empty Redis data start again\n";
$msg .= "Do you want to continue? [Y/n]";
fwrite(STDOUT, $msg);
$arg = strtolower(trim(fgets(STDIN)));
$arg = empty($arg) || !in_array($arg, array('y','n')) ? 'y' : $arg;
if ($arg == 'n')
{
foreach ($keys as $key)
{
$key = str_replace($GLOBALS['config']['redis']['prefix'].":", "", $key);
queue::del($key);
}
}
}
}
public function init_redis()
{
if (!self::$use_redis)
{
return false;
}
// 添加当前服务器到服务器列表
$this->add_server_list(self::$serverid, self::$tasknum);
// 删除当前服务器的任务状态
// 对于被强制退出的进程有用
for ($i = 1; $i <= self::$tasknum; $i++)
{
$this->del_task_status(self::$serverid, $i);
}
}
/**
* 设置任务状态, 主进程和子进程每成功采集一个页面后调用
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-10-30 23:56
*/
public function set_task_status()
{
// 每采集成功一个页面, 生成当前进程状态到文件, 供主进程使用
$mem = round(memory_get_usage(true)/(1024*1024),2);
$use_time = microtime(true) - self::$time_start;
$speed = round((self::$collect_succ + self::$collect_fail) / $use_time, 2);
$status = array(
'id' => self::$taskid,
'pid' => self::$taskpid,
'mem' => $mem,
'collect_succ' => self::$collect_succ,
'collect_fail' => self::$collect_fail,
'speed' => $speed,
);
$task_status = json_encode($status);
if (self::$use_redis)
{
$key = "server-".self::$serverid."-task_status-".self::$taskid;
queue::set($key, $task_status);
}
else
{
self::$task_status = array($task_status);
}
}
/**
* 删除任务状态
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-11-16 11:06
*/
public function del_task_status($serverid, $taskid)
{
if (!self::$use_redis)
{
return false;
}
$key = "server-{$serverid}-task_status-{$taskid}";
queue::del($key);
}
/**
* 获得任务状态, 主进程才会调用
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-10-30 23:56
*/
public function get_task_status($serverid, $taskid)
{
if (!self::$use_redis)
{
return false;
}
$key = "server-{$serverid}-task_status-{$taskid}";
$task_status = queue::get($key);
return $task_status;
}
/**
* 获得任务状态, 主进程才会调用
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-10-30 23:56
*/
public function get_task_status_list($serverid = 1, $tasknum)
{
$task_status = array();
if (self::$use_redis)
{
for ($i = 1; $i <= $tasknum; $i++)
{
$key = "server-{$serverid}-task_status-".$i;
$task_status[] = queue::get($key);
}
}
else
{
$task_status = self::$task_status;
}
return $task_status;
}
/**
* 添加当前服务器信息到服务器列表
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-11-16 11:06
*/
public function add_server_list($serverid, $tasknum)
{
if (!self::$use_redis)
{
return false;
}
// 更新服务器列表
$server_list_json = queue::get("server_list");
$server_list = array();
if (!$server_list_json)
{
$server_list[$serverid] = array(
'serverid' => $serverid,
'tasknum' => $tasknum,
'time' => time(),
);
}
else
{
$server_list = json_decode($server_list_json, true);
$server_list[$serverid] = array(
'serverid' => $serverid,
'tasknum' => $tasknum,
'time' => time(),
);
ksort($server_list);
}
queue::set("server_list", json_encode($server_list));
}
/**
* 从服务器列表中删除当前服务器信息
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-11-16 11:06
*/
public function del_server_list($serverid)
{
if (!self::$use_redis)
{
return false;
}
$server_list_json = queue::get("server_list");
$server_list = array();
if ($server_list_json)
{
$server_list = json_decode($server_list_json, true);
if (isset($server_list[$serverid]))
{
unset($server_list[$serverid]);
}
// 删除完当前的任务列表如果还存在,就更新一下Redis
if (!empty($server_list))
{
ksort($server_list);
queue::set("server_list", json_encode($server_list));
}
}
}
/**
* 获取等待爬取页面数量
*
* @param mixed $url
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function get_collect_url_num()
{
if (self::$use_redis)
{
$count = queue::get("collect_urls_num");
}
else
{
$count = self::$collect_urls_num;
}
return $count;
}
/**
* 获取已经爬取页面数量
*
* @param mixed $url
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function get_collected_url_num()
{
if (self::$use_redis)
{
$count = queue::get("collected_urls_num");
}
else
{
$count = self::$collected_urls_num;
}
return $count;
}
/**
* 已采集页面数量加一
*
* @param mixed $url
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function incr_collected_url_num($url)
{
if (self::$use_redis)
{
queue::incr("collected_urls_num");
}
else
{
self::$collected_urls_num++;
}
}
/**
* 从队列左边插入
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function queue_lpush($link = array(), $allowed_repeat = false)
{
if (empty($link) || empty($link['url']))
{
return false;
}
$url = $link['url'];
$link = $this->link_compress($link);
$status = false;
if (self::$use_redis)
{
$key = "collect_urls-".md5($url);
$lock = "lock-".$key;
// 加锁: 一个进程一个进程轮流处理
if (queue::lock($lock))
{
$exists = queue::exists($key);
// 不存在或者当然URL可重复入
if (!$exists || $allowed_repeat)
{
// 待爬取网页记录数加一
queue::incr("collect_urls_num");
// 先标记为待爬取网页
queue::set($key, time());
// 入队列
$link = json_encode($link);
queue::lpush("collect_queue", $link);
$status = true;
}
// 解锁
queue::unlock($lock);
}
}
else
{
$key = md5($url);
if (!array_key_exists($key, self::$collect_urls))
{
self::$collect_urls_num++;
self::$collect_urls[$key] = time();
array_push(self::$collect_queue, $link);
$status = true;
}
}
return $status;
}
/**
* 从队列右边插入
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function queue_rpush($link = array(), $allowed_repeat = false)
{
if (empty($link) || empty($link['url']))
{
return false;
}
$url = $link['url'];
$status = false;
if (self::$use_redis)
{
$key = "collect_urls-".md5($url);
$lock = "lock-".$key;
// 加锁: 一个进程一个进程轮流处理
if (queue::lock($lock))
{
$exists = queue::exists($key);
// 不存在或者当然URL可重复入
if (!$exists || $allowed_repeat)
{
// 待爬取网页记录数加一
queue::incr("collect_urls_num");
// 先标记为待爬取网页
queue::set($key, time());
// 入队列
$link = json_encode($link);
queue::rpush("collect_queue", $link);
$status = true;
}
// 解锁
queue::unlock($lock);
}
}
else
{
$key = md5($url);
if (!array_key_exists($key, self::$collect_urls))
{
self::$collect_urls_num++;
self::$collect_urls[$key] = time();
array_unshift(self::$collect_queue, $link);
$status = true;
}
}
return $status;
}
/**
* 从队列左边取出
* 后进先出
* 可以避免采集内容页有分页的时候采集失败数据拼凑不全
* 还可以按顺序采集列表页
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function queue_lpop()
{
if (self::$use_redis)
{
$link = queue::lpop("collect_queue");
$link = json_decode($link, true);
}
else
{
$link = array_pop(self::$collect_queue);
}
return $link;
}
/**
* 从队列右边取出
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function queue_rpop()
{
if (self::$use_redis)
{
$link = queue::rpop("collect_queue");
$link = json_decode($link, true);
}
else
{
$link = array_shift(self::$collect_queue);
}
return $link;
}
/**
* 队列长度
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function queue_lsize()
{
if (self::$use_redis)
{
$lsize = queue::lsize("collect_queue");
}
else
{
$lsize = count(self::$collect_queue);
}
return $lsize;
}
/**
* 采集深度加一
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function incr_depth_num($depth)
{
if (self::$use_redis)
{
$lock = "lock-depth_num";
// 锁2秒
if (queue::lock($lock, time(), 2))
{
if (queue::get("depth_num") < $depth)
{
queue::set("depth_num", $depth);
}
queue::unlock($lock);
}
}
else
{
if (self::$depth_num < $depth)
{
self::$depth_num = $depth;
}
}
}
/**
* 获得采集深度
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function get_depth_num()
{
if (self::$use_redis)
{
$depth_num = queue::get("depth_num");
return $depth_num ? $depth_num : 0;
}
else
{
return self::$depth_num;
}
}
/**
* 提取到的field数目加一
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function incr_fields_num()
{
if (self::$use_redis)
{
$fields_num = queue::incr("fields_num");
}
else
{
self::$fields_num++;
$fields_num = self::$fields_num;
}
return $fields_num;
}
/**
* 提取到的field数目
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
public function get_fields_num()
{
if (self::$use_redis)
{
$fields_num = queue::get("fields_num");
}
else
{
$fields_num = self::$fields_num;
}
return $fields_num ? $fields_num : 0;
}
/**
* 采用xpath分析提取字段
*
* @param mixed $html
* @param mixed $selector
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-18 10:17
*/
public function get_fields_xpath($html, $selector, $fieldname)
{
$result = selector::select($html, $selector);
if (selector::$error)
{
log::error("Field(\"{$fieldname}\") ".selector::$error."\n");
}
return $result;
}
/**
* 采用正则分析提取字段
*
* @param mixed $html
* @param mixed $selector
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-18 10:17
*/
public function get_fields_regex($html, $selector, $fieldname)
{
$result = selector::select($html, $selector, 'regex');
if (selector::$error)
{
log::error("Field(\"{$fieldname}\") ".selector::$error."\n");
}
return $result;
}
/**
* 采用CSS选择器提取字段
*
* @param mixed $html
* @param mixed $selector
* @param mixed $fieldname
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-18 10:17
*/
public function get_fields_css($html, $selector, $fieldname)
{
$result = selector::select($html, $selector, 'css');
if (selector::$error)
{
log::error("Field(\"{$fieldname}\") ".selector::$error."\n");
}
return $result;
}
/**
* 清空shell输出内容
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-11-16 11:06
*/
public function clear_echo()
{
$arr = array(27, 91, 72, 27, 91, 50, 74);
foreach ($arr as $a)
{
print chr($a);
}
//array_map(create_function('$a', 'print chr($a);'), array(27, 91, 72, 27, 91, 50, 74));
}
/**
* 替换shell输出内容
*
* @param mixed $message
* @param mixed $force_clear_lines
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-11-16 11:06
*/
public function replace_echo($message, $force_clear_lines = NULL)
{
static $last_lines = 0;
if(!is_null($force_clear_lines))
{
$last_lines = $force_clear_lines;
}
// 获取终端宽度
$toss = $status = null;
$term_width = exec('tput cols', $toss, $status);
if($status || empty($term_width))
{
$term_width = 64; // Arbitrary fall-back term width.
}
$line_count = 0;
foreach(explode("\n", $message) as $line)
{
$line_count += count(str_split($line, $term_width));
}
// Erasure MAGIC: Clear as many lines as the last output had.
for($i = 0; $i < $last_lines; $i++)
{
// Return to the beginning of the line
echo "\r";
// Erase to the end of the line
echo "\033[K";
// Move cursor Up a line
echo "\033[1A";
// Return to the beginning of the line
echo "\r";
// Erase to the end of the line
echo "\033[K";
// Return to the beginning of the line
echo "\r";
// Can be consolodated into
// echo "\r\033[K\033[1A\r\033[K\r";
}
$last_lines = $line_count;
echo $message."\n";
}
/**
* 展示启动界面, Windows 不会到这里来
* @return void
*/
public function display_ui()
{
$loadavg = sys_getloadavg();
foreach ($loadavg as $k=>$v)
{
$loadavg[$k] = round($v, 2);
}
$display_str = "\033[1A\n\033[K-----------------------------\033[47;30m PHPSPIDER \033[0m-----------------------------\n\033[0m";
//$display_str = "-----------------------------\033[47;30m PHPSPIDER \033[0m-----------------------------\n\033[0m";
$run_time_str = util::time2second(time()-self::$time_start, false);
$display_str .= 'PHPSpider version:' . self::VERSION . " PHP version:" . PHP_VERSION . "\n";
$display_str .= 'start time:'. date('Y-m-d H:i:s', self::$time_start).' run ' . $run_time_str . " \n";
$display_str .= 'spider name: ' . self::$configs['name'] . "\n";
if (self::$multiserver)
{
$display_str .= 'server id: ' . self::$serverid."\n";
}
$display_str .= 'task number: ' . self::$tasknum . "\n";
$display_str .= 'load average: ' . implode(", ", $loadavg) . "\n";
$display_str .= "document: https://doc.phpspider.org\n";
$display_str .= $this->display_task_ui();
if (self::$multiserver)
{
$display_str .= $this->display_server_ui();
}
$display_str .= $this->display_collect_ui();
// 清屏
//$this->clear_echo();
// 返回到第一行,第一列
//echo "\033[0;0H";
$display_str .= "---------------------------------------------------------------------\n";
$display_str .= "Press Ctrl-C to quit. Start success.";
if (self::$terminate)
{
$display_str .= "\n\033[33mWait for the process exits...\033[0m";
}
//echo $display_str;
$this->replace_echo($display_str);
}
public function display_task_ui()
{
$display_str = "-------------------------------\033[47;30m TASKS \033[0m-------------------------------\n";
$display_str .= "\033[47;30mtaskid\033[0m". str_pad('', self::$taskid_length+2-strlen('taskid')).
"\033[47;30mtaskpid\033[0m". str_pad('', self::$pid_length+2-strlen('taskpid')).
"\033[47;30mmem\033[0m". str_pad('', self::$mem_length+2-strlen('mem')).
"\033[47;30mcollect succ\033[0m". str_pad('', self::$urls_length-strlen('collect succ')).
"\033[47;30mcollect fail\033[0m". str_pad('', self::$urls_length-strlen('collect fail')).
"\033[47;30mspeed\033[0m". str_pad('', self::$speed_length+2-strlen('speed')).
"\n";
// "\033[32;40m [OK] \033[0m"
$task_status = $this->get_task_status_list(self::$serverid, self::$tasknum);
foreach ($task_status as $json)
{
$task = json_decode($json, true);
if (empty($task))
{
continue;
}
$display_str .= str_pad($task['id'], self::$taskid_length+2).
str_pad($task['pid'], self::$pid_length+2).
str_pad($task['mem']."MB", self::$mem_length+2).
str_pad($task['collect_succ'], self::$urls_length).
str_pad($task['collect_fail'], self::$urls_length).
str_pad($task['speed']."/s", self::$speed_length+2).
"\n";
}
//echo "\033[9;0H";
return $display_str;
}
public function display_server_ui()
{
$display_str = "-------------------------------\033[47;30m SERVER \033[0m------------------------------\n";
$display_str .= "\033[47;30mserver\033[0m". str_pad('', self::$server_length+2-strlen('serverid')).
"\033[47;30mtasknum\033[0m". str_pad('', self::$tasknum_length+2-strlen('tasknum')).
"\033[47;30mmem\033[0m". str_pad('', self::$mem_length+2-strlen('mem')).
"\033[47;30mcollect succ\033[0m". str_pad('', self::$urls_length-strlen('collect succ')).
"\033[47;30mcollect fail\033[0m". str_pad('', self::$urls_length-strlen('collect fail')).
"\033[47;30mspeed\033[0m". str_pad('', self::$speed_length+2-strlen('speed')).
"\n";
$server_list_json = queue::get("server_list");
$server_list = json_decode($server_list_json, true);
foreach ($server_list as $server)
{
$serverid = $server['serverid'];
$tasknum = $server['tasknum'];
$mem = 0;
$speed = 0;
$collect_succ = $collect_fail = 0;
$task_status = $this->get_task_status_list($serverid, $tasknum);
foreach ($task_status as $json)
{
$task = json_decode($json, true);
if (empty($task))
{
continue;
}
$mem += $task['mem'];
$speed += $task['speed'];
$collect_fail += $task['collect_fail'];
$collect_succ += $task['collect_succ'];
}
$display_str .= str_pad($serverid, self::$server_length).
str_pad($tasknum, self::$tasknum_length+2).
str_pad($mem."MB", self::$mem_length+2).
str_pad($collect_succ, self::$urls_length).
str_pad($collect_fail, self::$urls_length).
str_pad($speed."/s", self::$speed_length+2).
"\n";
}
return $display_str;
}
public function display_collect_ui()
{
$display_str = "---------------------------\033[47;30m COLLECT STATUS \033[0m--------------------------\n";
$display_str .= "\033[47;30mfind pages\033[0m". str_pad('', 16-strlen('find pages')).
"\033[47;30mqueue\033[0m". str_pad('', 14-strlen('queue')).
"\033[47;30mcollected\033[0m". str_pad('', 15-strlen('collected')).
"\033[47;30mfields\033[0m". str_pad('', 15-strlen('fields')).
"\033[47;30mdepth\033[0m". str_pad('', 12-strlen('depth')).
"\n";
$collect = $this->get_collect_url_num();
$collected = $this->get_collected_url_num();
$queue = $this->queue_lsize();
$fields = $this->get_fields_num();
$depth = $this->get_depth_num();
$display_str .= str_pad($collect, 16);
$display_str .= str_pad($queue, 14);
$display_str .= str_pad($collected, 15);
$display_str .= str_pad($fields, 15);
$display_str .= str_pad($depth, 12);
$display_str .= "\n";
return $display_str;
}
/**
* 判断是否附件文件
*
* @return void
* @author seatle <seatle@foxmail.com>
* @created time :2016-09-23 17:13
*/
//public function is_attachment_file($url)
//{
//$mime_types = $GLOBALS['config']['mimetype'];
//$mime_types_flip = array_flip($mime_types);
//$pathinfo = pathinfo($url);
//$fileext = isset($pathinfo['extension']) ? $pathinfo['extension'] : '';
//$fileinfo = array();
//// 存在文件后缀并且是配置里面的后缀
//if (!empty($fileext) && isset($mime_types_flip[$fileext]))
//{
//stream_context_set_default(
//array(
//'http' => array(
//'method' => 'HEAD'
//)
//)
//);
//// 代理和Cookie以后实现, 方法和 file_get_contents 一样 使用 stream_context_create 设置
//$headers = get_headers($url, 1);
//if (strpos($headers[0], '302'))
//{
//$url = $headers['Location'];
//$headers = get_headers($url, 1);
//}
////print_r($headers);
//$fileinfo = array(
//'basename' => isset($pathinfo['basename']) ? $pathinfo['basename'] : '',
//'filename' => isset($pathinfo['filename']) ? $pathinfo['filename'] : '',
//'fileext' => isset($pathinfo['extension']) ? $pathinfo['extension'] : '',
//'filesize' => isset($headers['Content-Length']) ? $headers['Content-Length'] : 0,
//'atime' => isset($headers['Date']) ? strtotime($headers['Date']) : time(),
//'mtime' => isset($headers['Last-Modified']) ? strtotime($headers['Last-Modified']) : time(),
//);
//$mime_type = 'html';
//$content_type = isset($headers['Content-Type']) ? $headers['Content-Type'] : '';
//if (!empty($content_type))
//{
//$mime_type = isset($GLOBALS['config']['mimetype'][$content_type]) ? $GLOBALS['config']['mimetype'][$content_type] : $mime_type;
//}
//$mime_types_flip = array_flip($mime_types);
//// 判断一下是不是文件名被加什么后缀了, 比如 http://www.xxxx.com/test.jpg?token=xxxxx
//if (!isset($mime_types_flip[$fileinfo['fileext']]))
//{
//$fileinfo['fileext'] = $mime_type;
//$fileinfo['basename'] = $fileinfo['filename'].'.'.$mime_type;
//}
//}
//return $fileinfo;
//}
}