因为网站比较敏感, 所以具体网站就不说了, 直接说逻辑部分
为了降低数据提取的错误率, 所以使用了python的slimit库对js代码进行提取处理
对网站源码的JavaScript进行分析后, 发现在其中一个script标签内的js代码是利用抽取混淆的, 并用flashvars开头的变量存储
首先用python将该js代码进行提取
1 2 | response = requests.get(url=url, proxies=proxies) script = filter(lambda x: 'flashvars' in x, response.html.xpath('//script//text()')).__next__() |
将该部分代码格式化后可以看到键名为mediaDefinitions的值是数组, 数组内存储着videoUrl
在这里可以看到, 链接是被抽取的进行拼接后即可还原真实地址
下面会使用到python的slimit库的ast进行还原
1 2 | # 将js代码转成结构树 tree = Parser().parse(script) |
通过smlit的Parser类的parse方法, 对js代码转换为ast结构树
获取到结构树后, 需要自定义类, 并继承ASTVisitor, 自定义访问者遍历节点对节点进行抽取
下面代码我先对mediaDefinitions和qualityItems对象进行抽取还原
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | class VarStatement_Visitor(ASTVisitor): # 自定义访问者,重写VarStatement节点访问逻辑 def visit_VarStatement(self, node): Identifier, Object = node.children()[0].children() # 获取flashvars的节点 if 'flashvars' in Identifier.value: for i in Object.properties: left, right = i.children() # mediaDefinitions数组 if left.value == '"mediaDefinitions"': # 还原字典 for item in right.items: media_data = dict() for medias in item.properties: media_left, media_right = medias.children() if isinstance(media_right, ast.Array): data_list = [i.value for i in media_right.items] media_data[media_left.value[1:-1]] = data_list else: if media_left.value == '"defaultQuality"' or media_left.value == '"remote"': media_data[media_left.value[1:-1]] = media_right.value else: media_data[media_left.value[1:-1]] = media_right.value[1:-1] flashvars.append(media_data) # 获取qualityItems的节点 if 'qualityItems' in Identifier.value: for i in Object.items: media_data = dict() for medias in i.properties: media_left, media_right, = medias.children() if isinstance(media_right, ast.Number): media_data[media_left.value[1:-1]] = media_right.value else: media_data[media_left.value[1:-1]] = media_right.value[1:-1].replace('\\/', '/') flashvars.append(media_data) |
还原mediaDefinitions和qualityItems对象后, 继续创建访问者对抽取链接进行还原
进入的节点仍然是VarStatement, 因为视频的链接由多个变量并且不定数量进行拼接, 所以下面使用递归方式并使用了get_Identifier方法对抽取变量进行获取还原
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | class Media_Visitor(ASTVisitor): def __init__(self, i, *args, **kwargs): # 视频所在的序号 self.i = i # 映射关系 self.identifier = {} # 映射顺序 self.identifiers_list = [] super(*args, **kwargs) # 递归获取映射顺序 def get_Identifier(self, node, identifiers_list): left, right = node.children() identifiers_list.append(self.identifier[right.value]) if isinstance(left, ast.BinOp): self.get_Identifier(left, identifiers_list) else: identifiers_list.append(self.identifier[left.value]) def visit_VarStatement(self, node): Identifier, BinOp = node.children()[0].children() # 获取该函数地址的映射顺序 if 'media_' + str(self.i) in Identifier.value: # 计算该视频的真实地址 self.get_Identifier(BinOp, self.identifiers_list) # 填充视频地址 flashvars[self.i]['videoUrl'] = ''.join(self.identifiers_list[::-1]) # 根据映射关系进行恢复 elif isinstance(BinOp, ast.String) or (len(BinOp.children()) == 2 and isinstance(BinOp.children()[0], ast.String) and isinstance(BinOp.children()[1], ast.String)): if isinstance(BinOp, ast.String): self.identifier[Identifier.value] = BinOp.value[1:-1] else: self.identifier[Identifier.value] = ''.join([i.value[1:-1] for i in BinOp.children()]) |
下面是完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | import re import time import traceback import requests_html # 将js代码转为AST结构树 from slimit.parser import Parser # 自定义访问者 from slimit.visitors.nodevisitor import ASTVisitor # 判断类型 from slimit import ast import os requests = requests_html.HTMLSession() flashvars = list() class VarStatement_Visitor(ASTVisitor): # 自定义访问者,重写VarStatement节点访问逻辑 def visit_VarStatement(self, node): Identifier, Object = node.children()[0].children() # 获取flashvars的节点 if 'flashvars' in Identifier.value: for i in Object.properties: left, right = i.children() # mediaDefinitions数组 if left.value == '"mediaDefinitions"': # 还原字典 for item in right.items: media_data = dict() for medias in item.properties: media_left, media_right = medias.children() if isinstance(media_right, ast.Array): data_list = [i.value for i in media_right.items] media_data[media_left.value[1:-1]] = data_list else: if media_left.value == '"defaultQuality"' or media_left.value == '"remote"': media_data[media_left.value[1:-1]] = media_right.value else: media_data[media_left.value[1:-1]] = media_right.value[1:-1] flashvars.append(media_data) # 获取qualityItems的节点 if 'qualityItems' in Identifier.value: for i in Object.items: media_data = dict() for medias in i.properties: media_left, media_right, = medias.children() if isinstance(media_right, ast.Number): media_data[media_left.value[1:-1]] = media_right.value else: media_data[media_left.value[1:-1]] = media_right.value[1:-1].replace('\\/', '/') flashvars.append(media_data) class Media_Visitor(ASTVisitor): def __init__(self, i, *args, **kwargs): # 视频所在的序号 self.i = i # 映射关系 self.identifier = {} # 映射顺序 self.identifiers_list = [] super(*args, **kwargs) # 递归获取映射顺序 def get_Identifier(self, node, identifiers_list): left, right = node.children() identifiers_list.append(self.identifier[right.value]) if isinstance(left, ast.BinOp): self.get_Identifier(left, identifiers_list) else: identifiers_list.append(self.identifier[left.value]) def visit_VarStatement(self, node): Identifier, BinOp = node.children()[0].children() # 获取该函数地址的映射顺序 if 'media_' + str(self.i) in Identifier.value: # 计算该视频的真实地址 self.get_Identifier(BinOp, self.identifiers_list) # 填充视频地址 flashvars[self.i]['videoUrl'] = ''.join(self.identifiers_list[::-1]) # 根据映射关系进行恢复 elif isinstance(BinOp, ast.String) or (len(BinOp.children()) == 2 and isinstance(BinOp.children()[0], ast.String) and isinstance(BinOp.children()[1], ast.String)): if isinstance(BinOp, ast.String): self.identifier[Identifier.value] = BinOp.value[1:-1] else: self.identifier[Identifier.value] = ''.join([i.value[1:-1] for i in BinOp.children()]) def main(url): response = requests.get(url=url) script = filter(lambda x: 'flashvars' in x, response.html.xpath('//script//text()')).__next__() # 将js代码转成结构树 tree = Parser().parse(script) VarStatement_Visitor().visit(tree) for i, _ in enumerate(flashvars): Media_Visitor(i).visit(tree) print(flashvars) |