469 lines
22 KiB
Python
469 lines
22 KiB
Python
from bs4 import BeautifulSoup
|
|
import requests
|
|
import json
|
|
import asyncio, io, logging, os, tempfile
|
|
import plugins
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _initialise(bot):
|
|
plugins.register_user_command(["intel", "iitc", "portalpic", "portalinfo", "active_iitcplugins"])
|
|
plugins.register_admin_command(["setintel", "show_iitcplugins", "set_iitcplugins"])
|
|
_get_iitc_plugins(bot)
|
|
|
|
|
|
@asyncio.coroutine
|
|
def _open_file(name, otype):
|
|
return open(name, otype)
|
|
|
|
@asyncio.coroutine
|
|
def readline(f):
|
|
while True:
|
|
data = f.readline()
|
|
if data:
|
|
return data
|
|
|
|
def _parse_onlineRepos(url, ext=''):
|
|
page = requests.get(url).text
|
|
if 'gitlab.com' in url:
|
|
files = []
|
|
for json_page in json.loads(page):
|
|
for attribute, value in json_page.items():
|
|
if attribute == "name":
|
|
if value.endswith(ext):
|
|
files.append(url.replace("/tree/", "/blobs/master") + "&filepath=" + value)
|
|
return files
|
|
elif 'github.com' in url:
|
|
files = []
|
|
for attribute, value in json.loads(page).items():
|
|
if attribute == "tree":
|
|
for tree in value:
|
|
for attribute, value in tree.items():
|
|
if attribute == "path":
|
|
if value.endswith(ext) and not 'total-conversion-build.user.js' in value and not 'user-location.user.js' in value and not 'test' in value:
|
|
files.append(url.replace("https://api.github.com/repos/", "https://raw.githubusercontent.com/").replace("git/trees/",'').replace("master?recursive=1","master/") + value)
|
|
return files
|
|
else:
|
|
soup = BeautifulSoup(page, 'html.parser')
|
|
return [url + '/' + node.get('href') for node in soup.find_all('a') if node.get('href').endswith(ext)]
|
|
|
|
def _get_iitc_plugins(bot):
|
|
logger.debug("getting availible plugins")
|
|
if bot.config.exists(["intel_screenbot", "gitlab_token"]):
|
|
token = bot.config.get_by_path(["intel_screenbot", "gitlab_token"])
|
|
url_config = bot.config.get_by_path(["intel_screenbot", "plugin_dirs"])
|
|
ext = '.user.js'
|
|
data=[]
|
|
for url in url_config:
|
|
if ext in url:
|
|
item = {"name": url.split('/', url.count('/'))[-1].replace(ext, ''), "url": url}
|
|
data.append(item)
|
|
else:
|
|
if "gitlab.com" in url:
|
|
url = url + '?private_token=' + token
|
|
for file in _parse_onlineRepos(url, ext):
|
|
if "gitlab.com" in url:
|
|
item = {"name": file.split('=', file.count('='))[-1].replace(ext, ''), "url": file}
|
|
elif 'github.com' in url:
|
|
item = {"name": file.split('/', file.count('/'))[-1].replace(ext, ''), "url": file}
|
|
else:
|
|
item = {"name": file.split('/', file.count('/'))[-1].replace(ext, ''), "url": file}
|
|
data.append(item)
|
|
|
|
iitc_plugins = data
|
|
if bot.memory.exists(["iitc_plugins"]):
|
|
bot.memory.pop_by_path(["iitc_plugins"])
|
|
bot.memory.set_by_path(["iitc_plugins"], iitc_plugins)
|
|
else:
|
|
bot.memory.set_by_path(["iitc_plugins"], iitc_plugins)
|
|
|
|
def _get_lines(shell_command):
|
|
p = yield from asyncio.create_subprocess_shell(shell_command)
|
|
output = yield from asyncio.wait_for(p.wait(), 240.0)
|
|
return p.returncode, output, True
|
|
|
|
@asyncio.coroutine
|
|
def _screencap(url, args_filepath, filepath, filename, bot, event, arguments):
|
|
os.chmod(filepath, 0o666)
|
|
loop = asyncio.get_event_loop()
|
|
command = 'phantomjs hangupsbot/plugins/intel_screenbot/screencap.js "' + args_filepath + '"'
|
|
task = _get_lines(command)
|
|
task = asyncio.wait_for(task, 420.0)
|
|
exitcode, output, status = yield from task
|
|
|
|
# read the resulting file into a byte array
|
|
# yield from asyncio.sleep(10)
|
|
if (arguments['screenshotfunction'] != 'portalinfoText'):
|
|
file_resource = yield from _open_file(filepath, 'rb')
|
|
file_data = yield from loop.run_in_executor(None, file_resource.read)
|
|
image_data = yield from loop.run_in_executor(None, io.BytesIO, file_data)
|
|
if status:
|
|
try:
|
|
image_id = yield from bot._client.upload_image(image_data, filename=filename)
|
|
yield from bot.coro_send_message(event.conv.id_, None, image_id=image_id)
|
|
os.unlink(filepath)
|
|
os.unlink(args_filepath)
|
|
except Exception as e:
|
|
os.unlink(filepath)
|
|
os.unlink(args_filepath)
|
|
logger.exception("upload failed: {}".format(url))
|
|
logger.exception("exception: {}".format(e))
|
|
yield from bot.coro_send_message(event.conv_id, "<i>error uploading screenshot</i>")
|
|
else:
|
|
if status:
|
|
response_file = yield from _open_file(arguments['portalinfoResponse'], 'r')
|
|
response = yield from readline(response_file)
|
|
yield from bot.coro_send_message(event.conv.id_, response.replace("'", ""))
|
|
os.unlink(filepath)
|
|
os.unlink(args_filepath)
|
|
os.unlink(arguments['portalinfoResponse'])
|
|
|
|
def setintel(bot, event, *args):
|
|
"""set url for current converation for the intel or iitc command.
|
|
use /bot clearintel to clear the previous url before setting a new one."""
|
|
|
|
url = bot.conversation_memory_get(event.conv_id, 'IntelURL')
|
|
if url is None:
|
|
bot.conversation_memory_set(event.conv_id, 'IntelURL', ''.join(args))
|
|
html = "<i><b>{}</b> updated screenshot URL".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
else:
|
|
bot.conversation_memory_set(event.conv_id, 'IntelURL', None)
|
|
bot.conversation_memory_set(event.conv_id, 'IntelURL', ''.join(args))
|
|
html = "<i><b>{}</b> updated screenshot URL".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
def portalpic(bot, event, *args):
|
|
"""get a screenshot of the portalInfo by URL."""
|
|
|
|
arguments = {}
|
|
|
|
if args:
|
|
if len(args) > 1:
|
|
url = ' '.join(str(i) for i in args)
|
|
else:
|
|
url = args[0]
|
|
if '"' in url:
|
|
url = url.replace('"', '')
|
|
else:
|
|
html = "<i><b>{}</b> No Arguments provided".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
if bot.config.exists(["intel_screenbot", "email"]):
|
|
if bot.config.exists(["intel_screenbot", "password"]):
|
|
email = bot.config.get_by_path(["intel_screenbot", "email"])
|
|
password = bot.config.get_by_path(["intel_screenbot", "password"])
|
|
arguments['email'] = email
|
|
arguments['password'] = password
|
|
else:
|
|
html = "<i><b>{}</b> No Intel password has been added to config. Unable to authenticate".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
else:
|
|
html = "<i><b>{}</b> No Intel Email/password has been added to config. Unable to authenticate".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
if url is None:
|
|
html = "<i><b>{}</b> No Portal URL has been set for screenshots.".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
else:
|
|
if re.match(r'(http(s)?:\/\/)', url):
|
|
yield from bot.coro_send_message(event.conv_id, "<i>Portal Picture requested, please wait...</i>")
|
|
filepath = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name
|
|
filename = filepath.split('/', filepath.count('/'))[-1]
|
|
args_filepath = tempfile.NamedTemporaryFile(prefix="args_{}".format(event.conv_id), suffix=".json", delete=False).name
|
|
|
|
arguments['url'] = url
|
|
arguments['filepath'] = filepath
|
|
arguments['screenshotfunction'] = "portalinfoScreen"
|
|
|
|
with open(args_filepath, 'w') as out:
|
|
out.write(json.dumps(arguments))
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
image_data = yield from _screencap(url, args_filepath, filepath, filename, bot, event, arguments)
|
|
except Exception as e:
|
|
yield from bot.coro_send_message(event.conv_id, "<i>error getting screenshot</i>")
|
|
logger.exception("screencap failed".format(url))
|
|
return
|
|
else:
|
|
html = "<i><b>{}</b> No URL found in Arguments".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
def portalinfo(bot, event, *args):
|
|
"""get a of the portalInfo by URL."""
|
|
|
|
arguments = {}
|
|
|
|
if args:
|
|
if len(args) > 1:
|
|
url = ' '.join(str(i) for i in args)
|
|
else:
|
|
url = args[0]
|
|
if '"' in url:
|
|
url = url.replace('"', '')
|
|
else:
|
|
html = "<i><b>{}</b> No Arguments provided".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
if bot.config.exists(["intel_screenbot", "email"]):
|
|
if bot.config.exists(["intel_screenbot", "password"]):
|
|
email = bot.config.get_by_path(["intel_screenbot", "email"])
|
|
password = bot.config.get_by_path(["intel_screenbot", "password"])
|
|
arguments['email'] = email
|
|
arguments['password'] = password
|
|
else:
|
|
html = "<i><b>{}</b> No Intel password has been added to config. Unable to authenticate".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
else:
|
|
html = "<i><b>{}</b> No Intel Email/password has been added to config. Unable to authenticate".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
if url is None:
|
|
html = "<i><b>{}</b> No Portal URL has been set for portal Info.".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
else:
|
|
if re.match(r'(http(s)?:\/\/)', url):
|
|
yield from bot.coro_send_message(event.conv_id, "<i>Portal Info requested, please wait...</i>")
|
|
filepath = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name
|
|
portalinfoResponse = tempfile.NamedTemporaryFile(prefix="response_{}".format(event.conv_id), delete=False).name
|
|
filename = filepath.split('/', filepath.count('/'))[-1]
|
|
args_filepath = tempfile.NamedTemporaryFile(prefix="args_{}".format(event.conv_id), suffix=".json", delete=False).name
|
|
|
|
arguments['url'] = url
|
|
arguments['filepath'] = filepath
|
|
arguments['screenshotfunction'] = "portalinfoText"
|
|
arguments['portalinfoResponse'] = portalinfoResponse
|
|
|
|
with open(args_filepath, 'w') as out:
|
|
out.write(json.dumps(arguments))
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
image_data = yield from _screencap(url, args_filepath, filepath, filename, bot, event, arguments)
|
|
except Exception as e:
|
|
yield from bot.coro_send_message(event.conv_id, "<i>error getting infos</i>")
|
|
logger.exception("getting infos failed".format(url))
|
|
return
|
|
else:
|
|
html = "<i><b>{}</b> No URL found in Arguments".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
def intel(bot, event, *args):
|
|
"""get a screenshot of a search term or intel URL or the default intel URL of the hangout."""
|
|
|
|
arguments = {}
|
|
|
|
if args:
|
|
if len(args) > 1:
|
|
url = ' '.join(str(i) for i in args)
|
|
else:
|
|
url = args[0]
|
|
if '"' in url:
|
|
url = url.replace('"', '')
|
|
else:
|
|
try:
|
|
url = bot.conversation_memory_get(event.conv_id, 'IntelURL')
|
|
except:
|
|
url = 'https://www.ingress.com/intel'
|
|
|
|
if bot.config.exists(["intel_screenbot", "email"]):
|
|
if bot.config.exists(["intel_screenbot", "password"]):
|
|
email = bot.config.get_by_path(["intel_screenbot", "email"])
|
|
password = bot.config.get_by_path(["intel_screenbot", "password"])
|
|
arguments['email'] = email
|
|
arguments['password'] = password
|
|
else:
|
|
html = "<i><b>{}</b> No Intel password has been added to config. Unable to authenticate".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
else:
|
|
html = "<i><b>{}</b> No Intel Email/password has been added to config. Unable to authenticate".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
if url is None:
|
|
html = "<i><b>{}</b> No Intel URL or search term has been set for screenshots.".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
else:
|
|
if re.match(r'(http(s)?:\/\/)', url):
|
|
search = 'nix'
|
|
zoomParameter = re.search(r"(?:&z=)", url, re.IGNORECASE)
|
|
if zoomParameter:
|
|
ZoomSearch = re.finditer(r"(?:&z=).*", url, flags=re.I)
|
|
for matchNum, zoomlevel_raw in enumerate(ZoomSearch):
|
|
matchNum = matchNum + 1
|
|
zoomlevel_clean = zoomlevel_raw.group()
|
|
zoomlevel = zoomlevel_clean[3:][:2]
|
|
if zoomlevel.isdigit():
|
|
yield from bot.coro_send_message(event.conv_id, "<i>intel map at zoom level "+ zoomlevel + " requested, please wait...</i>")
|
|
else:
|
|
yield from bot.coro_send_message(event.conv_id, "<i>intel map requested, please wait...</i>")
|
|
else:
|
|
search = url
|
|
zoomParameter = re.search(r"(?<=z=)", url, re.IGNORECASE)
|
|
if zoomParameter:
|
|
ZoomSearch = re.finditer(r"(?<=z=)[^\s]+", url, flags=re.I)
|
|
for matchNum, zoomlevel_raw in enumerate(ZoomSearch):
|
|
matchNum = matchNum + 1
|
|
zoomlevel = zoomlevel_raw.group()
|
|
search = search.replace("z={}".format(zoomlevel),"")
|
|
if zoomlevel.isdigit():
|
|
yield from bot.coro_send_message(event.conv_id, "<i>searching " + search + " on intel map and screenshooting at zoom level "+ zoomlevel + " as requested, please wait...</i>")
|
|
arguments['zoomlevel'] = str(zoomlevel)
|
|
else:
|
|
yield from bot.coro_send_message(event.conv_id, "<i>searching " + search + " on intel map and screenshooting as requested, please wait...</i>")
|
|
url = 'https://www.ingress.com/intel'
|
|
|
|
filepath = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name
|
|
filename = filepath.split('/', filepath.count('/'))[-1]
|
|
args_filepath = tempfile.NamedTemporaryFile(prefix="args_{}".format(event.conv_id), suffix=".json", delete=False).name
|
|
|
|
arguments['search'] = search
|
|
arguments['url'] = url
|
|
arguments['filepath'] = filepath
|
|
arguments['maptype'] = "intel"
|
|
arguments['screenshotfunction'] = "map"
|
|
|
|
with open(args_filepath, 'w') as out:
|
|
out.write(json.dumps(arguments))
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
image_data = yield from _screencap(url, args_filepath, filepath, filename, bot, event, arguments)
|
|
except Exception as e:
|
|
yield from bot.coro_send_message(event.conv_id, "<i>error getting screenshot</i>")
|
|
logger.exception("screencap failed".format(url))
|
|
return
|
|
|
|
|
|
def iitc(bot, event, *args):
|
|
"""get a screenshot of a search term or intel URL or the default intel URL of the hangout."""
|
|
|
|
arguments = {}
|
|
|
|
if args:
|
|
if len(args) > 1:
|
|
url = ' '.join(str(i) for i in args)
|
|
else:
|
|
url = args[0]
|
|
if '"' in url:
|
|
url = url.replace('"', '')
|
|
else:
|
|
url = bot.conversation_memory_get(event.conv_id, 'IntelURL')
|
|
|
|
if bot.config.exists(["intel_screenbot", "email"]):
|
|
if bot.config.exists(["intel_screenbot", "password"]):
|
|
email = bot.config.get_by_path(["intel_screenbot", "email"])
|
|
password = bot.config.get_by_path(["intel_screenbot", "password"])
|
|
arguments['email'] = email
|
|
arguments['password'] = password
|
|
else:
|
|
html = "<i><b>{}</b> No Intel password has been added to config. Unable to authenticate".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
else:
|
|
html = "<i><b>{}</b> No Intel Email/password has been added to config. Unable to authenticate".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
if url is None:
|
|
html = "<i><b>{}</b> No Intel URL has been set for screenshots.".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
else:
|
|
if re.match(r'(http(s)?:\/\/)', url):
|
|
search = 'nix'
|
|
zoomParameter = re.search(r"(?:&z=)", url, re.IGNORECASE)
|
|
if zoomParameter:
|
|
ZoomSearch = re.finditer(r"(?:&z=).*", url, flags=re.I)
|
|
for matchNum, zoomlevel_raw in enumerate(ZoomSearch):
|
|
matchNum = matchNum + 1
|
|
zoomlevel_clean = zoomlevel_raw.group()
|
|
zoomlevel = zoomlevel_clean[3:][:2]
|
|
if zoomlevel.isdigit():
|
|
yield from bot.coro_send_message(event.conv_id, "<i>iitc map at zoom level "+ zoomlevel + " requested, please wait...</i>")
|
|
else:
|
|
yield from bot.coro_send_message(event.conv_id, "<i>iitc map requested, please wait...</i>")
|
|
else:
|
|
search = url
|
|
zoomParameter = re.search(r"(?<=z=)", url, re.IGNORECASE)
|
|
if zoomParameter:
|
|
ZoomSearch = re.finditer(r"(?<=z=)[^\s]+", url, flags=re.I)
|
|
for matchNum, zoomlevel_raw in enumerate(ZoomSearch):
|
|
matchNum = matchNum + 1
|
|
zoomlevel = zoomlevel_raw.group()
|
|
search = search.replace("z={}".format(zoomlevel),"")
|
|
if zoomlevel.isdigit():
|
|
yield from bot.coro_send_message(event.conv_id, "<i>searching " + search + " on iitc map and screenshooting it at zoom level "+ zoomlevel + " as requested, please wait...</i>")
|
|
arguments['zoomlevel'] = str(zoomlevel)
|
|
else:
|
|
yield from bot.coro_send_message(event.conv_id, "<i>searching " + search + " on iitc map and and screenshooting it as requested, please wait...</i>")
|
|
url = 'https://www.ingress.com/intel'
|
|
|
|
filepath = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name
|
|
filename = filepath.split('/', filepath.count('/'))[-1]
|
|
args_filepath = tempfile.NamedTemporaryFile(prefix="args_{}".format(event.conv_id), suffix=".json", delete=False).name
|
|
logger.debug("temporary screenshot file: {}".format(filepath))
|
|
logger.debug("temporary args file: {}".format(args_filepath))
|
|
if bot.conversation_memory_get(event.conv_id, 'iitc_plugins'):
|
|
plugins = []
|
|
plugin_names = bot.conversation_memory_get(event.conv_id, 'iitc_plugins').split(", ")
|
|
if bot.memory.exists(["iitc_plugins"]):
|
|
for plugin_objects in bot.memory.get_by_path(["iitc_plugins"]):
|
|
for plugin_name in plugin_names:
|
|
if plugin_objects["name"] == plugin_name:
|
|
plugins.append(plugin_objects["url"])
|
|
else:
|
|
plugins = ''
|
|
|
|
arguments['plugins'] = plugins
|
|
arguments['search'] = search
|
|
arguments['url'] = url
|
|
arguments['filepath'] = filepath
|
|
arguments['maptype'] = "iitc"
|
|
arguments['screenshotfunction'] = "map"
|
|
|
|
with open(args_filepath, 'w') as out:
|
|
out.write(json.dumps(arguments))
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
image_data = yield from _screencap(url, args_filepath, filepath, filename, bot, event, arguments)
|
|
except Exception as e:
|
|
yield from bot.coro_send_message(event.conv_id, "<i>error getting screenshot</i>")
|
|
logger.exception("screencap failed".format(url))
|
|
return
|
|
|
|
def show_iitcplugins(bot, event, *args):
|
|
"""Shows all available iitc Plugins."""
|
|
if bot.memory.exists(["iitc_plugins"]):
|
|
plugin_names = []
|
|
for plugin_objects in bot.memory.get_by_path(["iitc_plugins"]):
|
|
for attribute, value in plugin_objects.items():
|
|
if attribute == "name":
|
|
plugin_names.append(value)
|
|
yield from bot.coro_send_to_user_and_conversation(event.user.id_.chat_id, event.conv_id, "<i><b>IITC Plugins:</b><br> {}</i>".format(', <br>'.join(str(i) for i in plugin_names)), _("<i><b>{}</b>, I've sent you the plugins ;)</i>").format(event.user.full_name))
|
|
|
|
def set_iitcplugins(bot, event, *args):
|
|
"""sets the activated iitc plugins for a hangout."""
|
|
if not bot.conversation_memory_get(event.conv_id, 'iitc_plugins') is None:
|
|
bot.conversation_memory_set(event.conv_id, 'iitc_plugins', None)
|
|
bot.conversation_memory_set(event.conv_id, 'iitc_plugins', ', '.join(args))
|
|
html = "<i><b>{}</b> updated plugins".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
else:
|
|
bot.conversation_memory_set(event.conv_id, 'iitc_plugins', ', '.join(args))
|
|
html = "<i><b>{}</b> updated plugins".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
|
|
def active_iitcplugins(bot, event, *args):
|
|
"""shows the activated iitc plugins for a hangout."""
|
|
if bot.conversation_memory_get(event.conv_id, 'iitc_plugins') is None:
|
|
html = "<i><b>{}</b> no active IITC Plugins for this conversation".format(event.user.full_name)
|
|
yield from bot.coro_send_message(event.conv, html)
|
|
else:
|
|
plugins = bot.conversation_memory_get(event.conv_id, 'iitc_plugins')
|
|
html = "<i><b>{}</b> plugins for this conversation: {}<br />".format(event.user.full_name, plugins)
|
|
yield from bot.coro_send_message(event.conv, html)
|