1
0
Fork 0

moved to a new repository

This commit is contained in:
Kim, Jimin 2021-08-08 12:17:57 +09:00
parent 6f02218e3e
commit 998ff503e3
19 changed files with 2 additions and 2524 deletions

BIN
.github/example.png vendored

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.7 MiB

BIN
.github/logo.png vendored

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.3 KiB

17
.gitignore vendored
View file

@ -1,17 +0,0 @@
_/
secrets/
__pycache__/
config.json
# Created by https://www.toptal.com/developers/gitignore/api/vscode
# Edit at https://www.toptal.com/developers/gitignore?templates=vscode
### vscode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace
# End of https://www.toptal.com/developers/gitignore/api/vscode

12
.vscode/settings.json vendored
View file

@ -1,12 +0,0 @@
{
"editor.formatOnSave": true,
"editor.formatOnPaste": true,
"python.formatting.provider": "black",
"python.analysis.extraPaths": [
"./bot"
],
"cSpell.words": [
"pinnable"
]
}

21
LICENSE
View file

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2019 developomp
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -1,64 +1,2 @@
# [llama bot](https://github.com/developomp/llama-bot)
<div align="center">
<a href="https://opensource.org/licenses/MIT"><img alt="MIT License" src="https://img.shields.io/github/license/developomp/llama-bot?style=flat-square" /></a>
<a href="https://www.python.org/downloads/release/python-395"><img alt="Python version 3.9" src="https://img.shields.io/badge/python-3.9-blue?style=flat-square" /></a>
<a href="https://github.com/psf/black"><img alt="Code style: black" src="https://img.shields.io/badge/code style-black-000000.svg?style=flat-square"></a>
<br />
<br />
<img alt="llama logo" src=".github/logo.png" />
</div>
> **WARNING: THIS REPOSITORY CONTAINS NSFW CONTENT**
llama bot is a [discord](https://discord.com) bot made for the LP community [server](https://discord.gg/2fsar34APa).<br />
- Invitation of this bot to other server is blocked. You wll have to host the bot yourself if you want it on your discord server.
- The bot requires Python version 3.9 or greater.
- local configuration file and database does not exist. Everything is stored in google firebase. Portability ftw!
Example:
![example image of bot usage](.github/example.png)
# Setting up locally
Steps:
1. Clone this repository
- `git clone https://github.com/developomp/llama-bot.git`
2. Install dependencies ([requirements.txt](./requirements.txt) is in project root)
- `pip install -r requirements.txt`
3. create `secrets` directory under `bot` directory
4. Create a new discord bot
- https://discord.com/developers/applications
5. Create a firebase project and create firestore database (production mode is highly recommended)
- https://console.firebase.google.com
6. Generate and download service account key from firebase ([instruction](https://firebase.google.com/docs/admin/setup#initialize-sdk)), rename it to `firebase-adminsdk.json`, and put it in `secrets` directory.
- https://console.firebase.google.com/project/_/settings/serviceaccounts/adminsdk
7. create `secret.json` in `secrets` directory and put the discord bot token
```json
{
"token": "<TOKEN>"
}
```
8. Start the bot (working directory must be set to `bot` directory)
- `python llama.py`
More info:
- discord developers documentation: https://discord.com/developers/docs
- [discord python API](https://github.com/Rapptz/discord.py) documentation: https://discordpy.readthedocs.io
- firebase admin sdk documentation: https://firebase.google.com/docs
# contributing
- Usage of [vscode](https://code.visualstudio.com) is highly recommended
- Format python code using the [black](https://github.com/psf/black) formatter
- Format markdown file(s) with [prettier](https://prettier.io) formatter
# Special thanks
- `Davidisacookie#9888 (265697563280146433)` for making the bot icon
**ATTENTION!**<br />
The llama bot has found a new home: https://github.com/llama-bot/llama-bot

View file

@ -1,318 +0,0 @@
from llama import Llama
import cogs._util as util
import discord
from discord.ext import commands
import json
class Admin(commands.Cog):
def __init__(self, bot):
self.bot: Llama = bot
self.allowed_channels: list[discord.TextChannel] = [
self.bot.LP_SERVER.get_channel(int(self.bot.VARS["channels"]["ADMIN_BOT"])),
]
self.draft_common_message = "required to create a new draft"
async def cog_check(self, ctx: commands.Context):
if exception_or_bool := await util.on_pm(ctx.message, self.bot):
raise exception_or_bool
return not exception_or_bool
def draft_id_required_error_message(self, draft_id):
"""checks if draft id is passed in draft command"""
if not draft_id:
raise discord.ext.commands.errors.MissingRequiredArgument(
"draft id is" + self.draft_common_message
)
def draft_ids_are_required_error_message(self, draft_id, draft_id2):
"""checks if draft id 1 and 2 are passed in draft command"""
if not (draft_id and draft_id2):
raise discord.ext.commands.errors.MissingRequiredArgument(
"draft ids are" + self.draft_common_message
)
async def error_if_not_in_admin_channel(self, ctx: discord.ext.commands.Context):
if ctx.message.channel not in self.allowed_channels:
allowed_in = ", ".join(
[channel_id.mention for channel_id in self.allowed_channels]
)
raise util.NotAdminChannel(
f"Admin commands can only be executed in: {allowed_in}"
)
@commands.command(
aliases=[
"m",
],
help="Send and edit messages so other mods can change rules and stuff",
usage="""Build and edit messages using the draft feature.
Type `{prefix}help draft` for more information.
> {prefix}{command} <operation> <selector> <message draft id>
selector: message url, `CHANNEL_ID/MESSAGE_ID` path, channel id, channel mention, channel url
message draft id: type `{prefix}draft list` to list available ids
Sending message:
> {prefix}{command} <send/create/s/c> <channel selector> <message draft id>
Deleting message:
> {prefix}{command} <delete/remove/d> <message selector> <message draft id>
Replacing message:
> {prefix}{command} <replace/r> <message selector> <message draft id>
""",
)
@util.must_be_admin()
async def message(self, ctx, msg_or_channel_path: str = None, draft_id: str = None):
await self.error_if_not_in_admin_channel(ctx)
await ctx.send(
embed=discord.Embed(
title="Not ready yet",
description="This feature is not implemented yet.",
)
)
@commands.command(
aliases=[
"d",
"drafts",
],
help="Create and edit message drafts for the `message` command",
usage="""> {prefix}{command} <operation> <draft id 1> <draft id 2>
draft ids: string with no space (preferably separated by underscores `_`).
Creating new draft:
> {prefix}{command} <create/new/c/n> <draft id>
Removing draft:
> {prefix}{command} <remove/delete/del> <draft id>
Editing draft:
> {prefix}{command} <edit/e> <draft id>
Duplicating draft:
> {prefix}{command} <duplicate/copy/cp> <source draft id> <new draft id>
Renaming draft:
> {prefix}{command} <rename/rn> <old draft id> <new draft id>
Previewing draft:
> {prefix}{command} <preview/p> <draft id>
View raw data of a draft
> {prefix}{command} <view/v> <draft id>
Listing available drafts:
> {prefix}{command} <list/l>
""",
)
@util.must_be_admin()
async def draft(
self, ctx, operation: str, draft_id: str = None, draft_id2: str = None
):
"""https://discord.com/developers/docs/resources/channel#embed-object"""
await self.error_if_not_in_admin_channel(ctx)
if operation in ["create", "new", "c", "n"]:
self.draft_id_required_error_message(draft_id)
try: # check if draft already exists
if self.bot.VARS["drafts"][draft_id]:
await ctx.send(
embed=discord.Embed(
title="Draft already exists",
description="A draft with the same id was found. Edit it, or delete and create a new one.",
)
)
return
except KeyError:
pass # draft does not exist. Can create a new one.
# create new embed with default values
new_embed_data = dict()
new_embed_data["title"] = "title"
new_embed_data["description"] = "description"
new_embed_data["color"] = 0
new_embed_data["fields"] = [
{"name": "name1", "value": "value1", "inline": True},
{"name": "name2", "value": "value2", "inline": True},
]
# update local and database vars
json_embed_data = json.dumps(new_embed_data)
self.bot.VARS["drafts"][draft_id] = json_embed_data
self.bot.llama_firebase.create("vars", "drafts", draft_id, json_embed_data)
# feedback
await ctx.send(
embed=discord.Embed(
title="Success!",
description=f"New draft `{draft_id}` has been created.",
)
)
elif operation in ["remove", "delete", "del"]:
self.draft_id_required_error_message(draft_id)
try:
# delete draft from local variable
del self.bot.VARS["drafts"][draft_id]
except KeyError: # when draft id is not found
await ctx.send(
embed=discord.Embed(
title="Draft does not exist",
description=f"A draft with the id `{draft_id}` was not found. You're trying to delete a draft that does not exist.",
)
)
return
# delete if it's not found in local variable
self.bot.llama_firebase.delete("vars", "drafts", draft_id)
# feedback
await ctx.send(
embed=discord.Embed(
title="Success!",
description=f"Draft `{draft_id}` has been successfully removed.",
)
)
elif operation in ["edit", "e"]:
self.draft_id_required_error_message(draft_id)
try: # check if draft exists
self.bot.VARS["drafts"][draft_id]
except KeyError: # draft does not exist. Show error message.
await ctx.send(
embed=discord.Embed(
title="Draft does not exists",
description=f"A draft with the id {draft_id} was not found. Create it first.",
)
)
return
# create new embed with default values
new_embed_data = json.loads(self.bot.VARS["drafts"][draft_id])
new_embed_data["title"] = "title"
new_embed_data["description"] = "description"
new_embed_data["color"] = 0
new_embed_data["fields"] = [
{"name": "name1", "value": "value1", "inline": True},
{"name": "name2", "value": "value2", "inline": True},
]
# update local and database vars
json_embed_data = json.dumps(new_embed_data)
self.bot.VARS["drafts"][draft_id] = json_embed_data
self.bot.llama_firebase.write("vars", "drafts", draft_id, json_embed_data)
# feedback
await ctx.send(
embed=discord.Embed(
title="Success!",
description=f"Draft `{draft_id}` has been successfully edited.",
)
)
elif operation in ["duplicate", "copy", "cp"]:
self.draft_ids_are_required_error_message(draft_id, draft_id2)
try: # check if draft id is available
if self.bot.VARS["drafts"][draft_id2]:
await ctx.send(
embed=discord.Embed(
title="Name not available",
description=f"A draft with the id `{draft_id2}` already exists. Choose another id.",
)
)
return
except KeyError:
pass # draft id is available. Can create a copy.
# update local and database vars
self.bot.VARS["drafts"][draft_id2] = self.bot.VARS["drafts"][draft_id]
self.bot.llama_firebase.create(
"vars", "drafts", draft_id2, self.bot.VARS["drafts"][draft_id2]
)
# feedback
await ctx.send(
embed=discord.Embed(
title="Success!",
description=f"New draft `{draft_id2}` has been created from `{draft_id}`.",
)
)
elif operation in ["rename", "rn"]:
self.draft_ids_are_required_error_message(draft_id, draft_id2)
try: # check if draft id is available
if self.bot.VARS["drafts"][draft_id2]:
await ctx.send(
embed=discord.Embed(
title="Name not available",
description=f"A draft with the id `{draft_id2}` already exists. Choose another id.",
)
)
return
except KeyError:
pass # draft id is available. Can rename draft.
# update local and database vars
self.bot.VARS["drafts"][draft_id2] = self.bot.VARS["drafts"][draft_id]
del self.bot.VARS["drafts"][draft_id]
self.bot.llama_firebase.delete("vars", "drafts", draft_id)
self.bot.llama_firebase.create(
"vars", "drafts", draft_id2, self.bot.VARS["drafts"][draft_id2]
)
# feedback
await ctx.send(
embed=discord.Embed(
title="Success!",
description=f"Draft `{draft_id}` has been renamed to`{draft_id2}`.",
)
)
elif operation in ["preview", "p"]:
self.draft_id_required_error_message(draft_id)
try:
await ctx.send(
f"> preview of draft **{draft_id}**",
embed=discord.Embed.from_dict(
json.loads(self.bot.VARS["drafts"][draft_id])
),
)
except KeyError:
await ctx.send(
embed=discord.Embed(
title="Cannot find draft",
description=f"draft `{draft_id}` was not found in the draft database.",
)
)
elif operation in ["view", "v"]:
self.draft_id_required_error_message(draft_id)
await ctx.send(
embed=discord.Embed(
title=draft_id,
description="```%s```"
% json.dumps(
json.loads(self.bot.VARS["drafts"][draft_id]), indent=4
),
)
)
elif operation in ["list", "l"]:
await ctx.send(
embed=discord.Embed(
title="List of drafts",
description="Nothing found"
if not self.bot.VARS["drafts"]
else "".join([f"\n- **{i}**" for i in self.bot.VARS["drafts"]]),
)
)
def setup(bot):
bot.add_cog(Admin(bot))

View file

@ -1,195 +0,0 @@
from llama import Llama
from . import _util as util
import discord
from discord.ext import commands
import datetime
class Logging(commands.Cog):
def __init__(self, bot):
self.bot: Llama = bot
self.log_edit = True
self.remember_how_many = 3 # how many messages it will remember
# {CHANNEL_ID: [MSG_UPDATE, ...], ..}
self.edits: dict[int, list[discord.RawMessageUpdateEvent]] = dict()
# {CHANNEL_ID: [[DELETE_TIME, MSG_DELETE], ...], ...}
self.deletes: dict[
int, list[list[datetime.datetime, discord.RawMessageDeleteEvent]]
] = dict()
# {CHANNEL_ID: [[DELETE_TIME, MSG_BULK_DELETE], ...], ...}
self.bulk_deletes: dict[
int, list[list[datetime.datetime, discord.RawBulkMessageDeleteEvent]]
] = dict()
# {AUTHOR_ID: {CHANNEL_ID: [MSG_UPDATE, ...], ...}, ...}
# {AUTHOR_ID: {CHANNEL_ID: [MSG_UPDATE, ...], ...}, ...}
self.edits_per_user: dict[
int, dict[int, list[discord.RawMessageUpdateEvent]]
] = dict()
# {AUTHOR_ID: {CHANNEL_ID: [MSG_DELETE, ...], ...}, ...}
self.deletes_per_user: dict[
int, dict[int, list[discord.RawMessageDeleteEvent]]
] = dict()
async def cog_check(self, ctx: commands.Context):
if exception_or_bool := await util.on_pm(ctx.message, self.bot):
raise exception_or_bool
return not exception_or_bool
@commands.Cog.listener()
async def on_raw_message_edit(self, payload: discord.RawMessageUpdateEvent):
# channel level
if payload.channel_id not in self.edits.keys():
self.edits[payload.channel_id] = []
self.edits[payload.channel_id].append(payload)
if len(self.edits[payload.channel_id]) > self.remember_how_many:
del self.edits[payload.channel_id][0]
# user level
try:
author_id: int = int(payload.data["author"]["id"])
except KeyError:
return
if author_id not in self.edits_per_user.keys():
self.edits_per_user[author_id] = dict()
if payload.channel_id not in self.edits_per_user[author_id].keys():
self.edits_per_user[author_id][payload.channel_id] = []
self.edits_per_user[author_id][payload.channel_id].append(payload)
if (
len(self.edits_per_user[author_id][payload.channel_id])
> self.remember_how_many
):
# todo: queue task to prevent timing issue causing the deletion of the same message
del self.edits_per_user[author_id][payload.channel_id][0]
@commands.Cog.listener()
async def on_raw_message_delete(self, payload: discord.RawMessageDeleteEvent):
# channel level
if payload.channel_id not in self.deletes.keys():
self.deletes[payload.channel_id] = []
self.deletes[payload.channel_id].append([datetime.datetime.utcnow(), payload])
if len(self.deletes[payload.channel_id]) > self.remember_how_many:
del self.deletes[payload.channel_id][0]
# user level
if payload.cached_message:
author_id = payload.cached_message.author.id
if author_id not in self.deletes_per_user.keys():
self.deletes_per_user[author_id] = dict()
if payload.channel_id not in self.deletes_per_user[author_id].keys():
self.deletes_per_user[author_id][payload.channel_id] = []
self.deletes_per_user[author_id][payload.channel_id].append(payload)
if (
len(self.deletes_per_user[author_id][payload.channel_id])
> self.remember_how_many
):
# todo: queue task to prevent timing issue causing the deletion of the same message
del self.deletes_per_user[author_id][payload.channel_id][0]
@commands.Cog.listener()
async def on_raw_bulk_message_delete(
self, payload: discord.RawBulkMessageDeleteEvent
):
if payload.channel_id not in self.bulk_deletes.keys():
self.bulk_deletes[payload.channel_id] = []
self.bulk_deletes[payload.channel_id].append(
[datetime.datetime.utcnow(), payload]
)
if len(self.bulk_deletes[payload.channel_id]) > self.remember_how_many:
del self.bulk_deletes[0]
async def log_edit(
self, channel: discord.TextChannel, payload: discord.RawMessageUpdateEvent
):
await channel.send(
embed=discord.Embed(
title="Message Edited",
description=f"""**@<PLAYER> Edited their message in: <#CHANNEL>
[jump to message](URL)**""",
)
.add_field(
name="before", value=payload.cached_message.content, inline=False
)
.add_field(name="after", value=payload.data["content"], inline=False)
)
async def log_delete(
self, channel: discord.TextChannel, payload: discord.RawMessageDeleteEvent
):
await channel.send(embed=discord.Embed(title="Message Deleted", description=""))
async def log_bulk_delete(
self, channel: discord.TextChannel, payload: discord.RawBulkMessageDeleteEvent
):
await channel.send(
embed=discord.Embed(title="Bulk Message Deletion", description="")
)
'''
@commands.command(
help="Shows recently deleted messages and its content. Memory does NOT get wiped after some time.",
usage="""> {prefix}{command} {?depth}
Use it after someone deleted their message.
Show most recently deleted message
> {prefix}{command}
Show second most recently deleted message
> {prefix}{command} 2"""
)
async def snipe(self, ctx, depth=1):
if not self.last_del_message:
await ctx.send(f"There's no message deleted since last reboot!")
return
author = self.last_del_message.author
await ctx.send(
embed=discord.Embed(description=f"**Message deleted in:** <#{self.last_del_message.channel.id}> [Jump to Message]({self.last_del_message.jump_url})")
.set_thumbnail(url=author.avatar_url)
.add_field(name="Author", value=f"{author.mention} ({author})", inline=False)
.add_field(name="Content", value=self.last_del_message.content, inline=False)
.set_footer(text=f"Deleted at: {self.last_del_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
)
@commands.command(
help="Shows most recently edited message and it's content before and after the edit. Memory gets wiped after some time.",
usage="""> {prefix}{command} {?depth}
Use it after someone edited their message."""
)
async def editsnipe(self, ctx, depth=1):
if not self.last_edit_after:
await ctx.send(f"There's no message edited since last reboot!")
return
author = self.last_edit_after.author
await ctx.send(
embed=discord.Embed(description=f"**Message Edited in:** <#{self.last_edit_after.channel.id}> [Jump to Message]({self.last_edit_after.jump_url})")
.set_thumbnail(url=author.avatar_url)
.add_field(name="Author", value=f"{author.mention} ({author})", inline=False)
.add_field(name="Before", value=self.last_edit_before.content, inline=False)
.add_field(name="After", value=self.last_edit_after.content, inline=False)
.set_footer(text=f"Edited at: {self.last_edit_after.edited_at.strftime('%Y-%m-%d %H:%M:%S')} UTC")
)
'''
def setup(bot):
bot.add_cog(Logging(bot))

View file

@ -1,258 +0,0 @@
from llama import Llama
from . import _util as util
import discord
from discord.ext import commands
import re
class SelfRole(commands.Cog):
def __init__(self, bot):
self.bot: Llama = bot
# {"CHANNEL_ID/MESSAGE_ID": ["EMOJI_ID_OR_NAME;ROLE_ID", ...], ...}
self.binds: dict = self.bot.llama_firebase.read("vars", "selfrole_messages")
self.help_msg = ""
self.main_help_fields = [
["Self role", "Only admins can create self role message"],
]
async def cog_check(self, ctx: commands.Context):
if exception_or_bool := await util.on_pm(ctx.message, self.bot):
raise exception_or_bool
return not exception_or_bool
# self role add
@commands.Cog.listener()
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent):
if (
payload.member.id == self.bot.user.id
): # ignore if the user who added the reaction is this bot
return
try: # try to add get and add reaction
for emoji_role in self.binds[f"{payload.channel_id}/{payload.message_id}"]:
emoji_id_or_name, role_id = emoji_role.split(";")
assert emoji_id_or_name and role_id
if emoji_id_or_name in [payload.emoji.name, str(payload.emoji.id)]:
await payload.member.add_roles(
self.bot.LP_SERVER.get_role(int(role_id))
)
except KeyError:
pass # no reaction role bind found
# self role remove
@commands.Cog.listener()
async def on_raw_reaction_remove(self, payload: discord.RawReactionActionEvent):
try: # try to get and remove reaction
for emoji_role in self.binds[f"{payload.channel_id}/{payload.message_id}"]:
emoji_id_or_name, role_id = emoji_role.split(";")
assert emoji_id_or_name and role_id
if emoji_id_or_name in [payload.emoji.name, str(payload.emoji.id)]:
await (
await self.bot.LP_SERVER.fetch_member(payload.user_id)
).remove_roles(self.bot.LP_SERVER.get_role(int(role_id)))
except KeyError:
pass # no reaction role bind found
@commands.command(
aliases=[
"rr",
],
help="Add reactions for self role to a message.",
usage="""`MESSAGE_PATH`: can be `CHANNEL_ID`/`MESSAGE_ID` or a message jump url.
Add (`EMOJI_1` -> `ROLE_1`) and (`EMOJI_2` -> `ROLE_2`) binds from reaction role message.
> {prefix}{command} <add or a> `MESSAGE_PATH` `EMOJI_1` `ROLE_1` `EMOJI_2` `ROLE_2`
Remove (`EMOJI_1` -> `ROLE_1`) and (`EMOJI_2` -> `ROLE_2`) binds from reaction role message.
> {prefix}{command} <remove or r> `MESSAGE_PATH` `EMOJI_1` `ROLE_1` `EMOJI_2` `ROLE_2`""",
)
@util.must_be_admin()
async def reactrole(
self, ctx, mode, channel_id_or_message_path, *emojis_and_or_roles
):
_mode = None # 0: create, 1: remove
if mode in ["add", "a"]:
_mode = 0
elif mode in ["remove", "r"]:
_mode = 1
else:
raise discord.ext.commands.errors.BadArgument(
f"Invalid input {mode}. Type `-help reactrole` for more information."
)
# parse url or message path
flag = 0
try:
while True: # remove trailing slash(es)
if channel_id_or_message_path[-1] == "/":
channel_id_or_message_path = channel_id_or_message_path[:-1]
else:
break
# get "CHANNEL_ID/MESSAGE_ID" part of the input
# regex: get the last two number/number pattern.
# regex returns the first pattern that it finds so I had to flip the input with [::-1] and flip it back.
# I know. Big brain.
message_path = re.findall(r"\d+/\d+", channel_id_or_message_path[::-1])[0][
::-1
]
channel_id, message_id = message_path.split("/")
flag = 1
channel: discord.TextChannel = await self.bot.fetch_channel(channel_id)
flag = 2
message: discord.Message = await channel.fetch_message(message_id)
except Exception:
title = "parsing message path"
if flag == 1:
title = "fetching channel"
if flag == 2:
title = "fetching message"
await ctx.send(
embed=discord.Embed(
title=f"Error while {title}!",
description="Are you sure the inputs are correct?",
)
)
raise discord.ext.commands.errors.ArgumentParsingError
# If can't pair emojis and roles in groups of 2
if len(emojis_and_or_roles) % 2:
raise discord.ext.commands.errors.BadArgument(
"Failed to group emojis and roles. Number of emojis doesn't match the number of roles."
)
async def group_emojis_and_roles():
"""Pair list by groups of 2.
From: https://stackoverflow.com/a/1751478/12979111
"""
try:
emojis_and_roles_parsed = []
emoji_and_role_ids = []
for emoji_raw, role_raw in [
emojis_and_or_roles[i : i + 2]
for i in range(0, len(emojis_and_or_roles), 2)
]:
# convert raw emoji input to integer if it's a custom emoji. Leave it alone otherwise.
# only custom emojis have "<". (<:emojis_name:emoji_id>)
emoji_id = (
emoji_raw
if "<" not in emoji_raw
else int(re.findall(r"\d+", emoji_raw)[0])
)
role_id = int(re.findall(r"\d+", role_raw)[0])
# check if role and emojis are valid
emoji = (
emoji_raw
if emoji_id == emoji_raw
else self.bot.get_emoji(emoji_id)
)
role = self.bot.LP_SERVER.get_role(role_id)
# todo: check if role is higher than bot
assert emoji, "emoji"
assert role, "role"
emoji_and_role_ids.append([emoji_id, role_id])
emojis_and_roles_parsed.append([emoji, role])
except AssertionError as err:
err_msg = """{one} {two} might not a valid {three} that can be used by the bot.
Are you sure it's not a {three} from other server?""".format(
one=str(err).title(),
two=emoji_raw if err == "emoji" else role_raw,
three=err,
)
await ctx.send(
embed=discord.Embed(
title=f"Error while parsing {err}!", description=err_msg
)
)
raise discord.ext.commands.errors.BadArgument(err_msg)
return emojis_and_roles_parsed, emoji_and_role_ids
if _mode == 0: # add bind
if message_path not in self.binds.keys():
self.binds[message_path] = []
emojis_and_roles_parsed, emoji_and_role_ids = await group_emojis_and_roles()
# add what's not already in the binds
self.binds[message_path] += list(
{f"{emoji};{role}" for emoji, role in emoji_and_role_ids}
- set(self.binds[message_path])
)
# push to firebase
self.bot.llama_firebase.write(
"vars", "selfrole_messages", message_path, self.binds[message_path]
)
# add reaction
for emoji, _ in emojis_and_roles_parsed:
await message.add_reaction(emoji)
elif _mode == 1: # remove bind
if message_path not in self.binds.keys():
await ctx.send(
embed=discord.Embed(
title=":no_entry_sign: Message does not have any reaction role emojis.",
description=f"No reaction role bind was found in the [message]({message.jump_url})",
)
)
# todo: confirm (clear all existing reactions too or leave them?)
emojis_and_roles_parsed, emoji_and_role_ids = await group_emojis_and_roles()
# remove binds
for emoji, role in emoji_and_role_ids:
emoji_role = f"{emoji};{role}"
if emoji_role not in self.binds[message_path]:
await ctx.send(
embed=discord.Embed(
title=":no_entry_sign: Reaction role does not exist",
description="Bind (%s -> %s) was not found in the [message](%s)"
% (
self.bot.get_emoji(emoji),
self.bot.LP_SERVER.get_role(int(role)),
message.jump_url,
),
)
)
continue
self.binds[message_path].remove(emoji_role)
await message.remove_reaction(emoji, self.bot.user)
await ctx.send(
embed=discord.Embed(
title=":white_check_mark: Reaction role bind removed!",
description="Reaction role bind (%s -> %s) has been successfully removed from the [message](%s)!"
% (
self.bot.get_emoji(emoji),
self.bot.LP_SERVER.get_role(int(role)),
message.jump_url,
),
)
)
# remove list if empty
if not self.binds[message_path]:
self.binds.pop(message_path)
self.bot.llama_firebase.delete(
"vars", "selfrole_messages", message_path
)
else:
self.bot.llama_firebase.write(
"vars",
"selfrole_messages",
message_path,
self.binds[message_path],
False,
)
def setup(bot):
bot.add_cog(SelfRole(bot))

View file

@ -1,90 +0,0 @@
# I'm aware that I can use @commands.bot_has_permissions. It's a choice.
from discord import guild
from llama import Llama
import discord
from discord.ext import commands
from datetime import datetime
from typing import Union
import re
class NotAdminChannel(discord.ext.commands.errors.CheckFailure):
pass
def must_be_admin():
"""
Discord bot command decorator.
Put it under @discord.ext.commands.command()
"""
async def predicate(ctx: discord.ext.commands.Context):
if ctx.message.author.guild_permissions.administrator:
return True
await ctx.send(
embed=discord.Embed(
description=f"You need to be a server administrator to issue the command. Aborting."
)
)
return False
return commands.check(predicate)
def lists_has_intersection(list1: list, list2: list) -> bool:
"""
Checks if any of the roles in roles1 is in roles2
"""
return any(element in list1 for element in list2)
def url_from_str(string: str) -> list:
"""
Extract urls from string
https://daringfireball.net/2010/07/improved_regex_for_matching_urls
"""
url = re.findall(
r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))",
string,
)
return [x[0] for x in url]
def snowflake_to_datetime(snowflake: int):
return datetime.utcfromtimestamp(((int(snowflake) >> 22) + 1420070400000) / 1000)
def convert_to_partial_emoji(raw: Union[str, int], bot: Llama) -> discord.PartialEmoji:
# test if raw is an integer
try:
emoji_id: int = int(raw)
# check if emoji exists
emoji: discord.Emoji = bot.LP_SERVER.get_emoji(emoji_id)
if not emoji:
raise Exception(f"Cannot convert {raw} to discord Emoji.")
return discord.PartialEmoji(
name=emoji.name, animated=emoji.animated, id=emoji_id
)
except ValueError: # if emoji_raw is not a number
# todo: test if emoji is valid.
# todo: The checks that can be found online only works for custom emojis and not for emojis like 📌 or 📍.
return discord.PartialEmoji(name=raw.strip())
async def on_pm(message: discord.Message, bot: Llama):
if not message.guild and message.author != bot.user:
err_msg = "DM commands are not supported."
await message.channel.send(
embed=discord.Embed(
title=":exclamation: " + err_msg,
description="react with a `:broom:` :broom: emoji to delete existing messages",
),
delete_after=5.0,
)
return commands.NoPrivateMessage(err_msg)
else:
return False

View file

@ -1,330 +0,0 @@
import wbscraper.player
import wbscraper.commons
import wbscraper.data
import cogs._util as util
import discord
from discord.ext import commands
# todo: add change/remove uid feature
class WarBrokers(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.LLAMA_BOT = self.bot.get_channel_from_vars("LLAMA_BOT")
self.ADMIN_BOT = self.bot.get_channel_from_vars("ADMIN_BOT")
self.BOT_WORK = [self.LLAMA_BOT, self.ADMIN_BOT]
# list of string values of WB servers (ASIA, USA, etc.)
self.WB_GAME_SERVERS: list[str] = [
i
for i in wbscraper.commons.class_to_value_list(wbscraper.data.Location)
if type(i) == str
]
self.help_msg = f"<#{self.bot.VARS['channels']['LLAMAS_AND_PYJAMAS_INFO']}> and <#{self.bot.VARS['channels']['ACTIVE']}> automatically updates when the contents of the database is changed."
self.active_roster_channel = self.bot.LP_SERVER.get_channel(
int(self.bot.VARS["channels"]["ACTIVE"])
)
self.lp_info_channel = self.bot.LP_SERVER.get_channel(
int(self.bot.VARS["channels"]["LLAMAS_AND_PYJAMAS_INFO"])
)
async def update_player(self, user_id: int):
"""Update LP member list message"""
user_id = int(user_id)
player = self.bot.llama_firebase.read("players", user_id)
stat_page_url = wbscraper.URL.join(
wbscraper.URL.stat_root, "players/i", player["uid"]
)
player_wb = wbscraper.player.get_player(stat_page_url)
player_description = (
f"User: {(await self.bot.LP_SERVER.fetch_member(user_id)).mention}\n"
)
try:
player_description += "Preferred Weapon: %s\n" % player["weapon"]
except KeyError:
player_description += "Preferred Weapon: %s\n" % "No Data"
player_description += "K/D: %.4s\n" % player_wb.kdr
try:
player_description += "Time Zone: %s\n" % player["time"]
except KeyError:
player_description += "Time Zone: %s\n" % "No Data"
player_description += f"Stats page: {stat_page_url}\n"
embed = discord.Embed(title=player_wb.nick, description=player_description)
# update if stat message is in the database, add to the database otherwise.
try: # try assuming that the stat message exists already
stat_msg = await self.lp_info_channel.fetch_message(player["message_id"])
await stat_msg.edit(embed=embed)
except (KeyError, discord.NotFound):
info_message = await self.lp_info_channel.send(embed=embed)
self.bot.llama_firebase.write(
"players", user_id, "message_id", info_message.id
)
async def update_active(self):
"""Update active roster message"""
players = self.bot.llama_firebase.read_collection("players")
description = ""
for game_server in self.WB_GAME_SERVERS:
description += f"{game_server}:\n"
region_is_empty = True
for player in players:
try:
if game_server in players[player]["server"]:
description += f"<@{player}>\n"
region_is_empty = False
except KeyError:
pass
if region_is_empty:
description += "-- Empty --\n"
description += "\n"
emoji = discord.utils.get(self.bot.LP_SERVER.emojis, name="blobsalute")
embed = discord.Embed(
title=f"{emoji} LLAMAS PYJAMAS ACTIVE ROSTER {emoji}",
description=description,
)
# make a new message if the active roster doesn't exist, edit otherwise.
try: # assume that the active roster message exists
active_msg = await self.active_roster_channel.fetch_message(
int(self.bot.VARS["messages"]["ACTIVE"])
)
await active_msg.edit(embed=embed)
except (KeyError, discord.NotFound):
active_msg = await self.active_roster_channel.send(embed=embed)
self.bot.llama_firebase.write("vars", "messages", "ACTIVE", active_msg.id)
@commands.command(
help="Sets/updates data in the database.",
usage="""> {prefix}{command} <field> <data>
field: uid | weapon | time | server
data: anything that comes after the field
> {prefix}{command} uid <uid>
**Must run this before running other set commands**
Correlates WB uid with your discord ID. Must be a valid WB uid.
Only correlates with one uid. Alt accounts are not supported yet.
ex:
> {prefix}{command} uid 5d2ead35d142affb05757778
> {prefix}{command} weapon <weapon>
Sets your preferred weapon. No input specifications.
ex:
> {prefix}{command} weapon Sniper & AR
> {prefix}{command} time <time>
Sets your time zone. No input specifications.
ex:
> {prefix}{command} time UTC+8
> {prefix}{command} server <server1> <server2> ...
Sets the servers you usually play on. Use `{prefix}set server help` to get more info.
ex:
> {prefix}{command} server ASIA USA
""",
)
async def set(self, ctx, field: str, *args):
# check if user is authorized to use this command
if not util.lists_has_intersection(
self.bot.LLAMA_PERMS, ctx.message.author.roles
):
await ctx.send(
embed=discord.Embed(
title="Nope!",
description="You need to join the LPWB squad to use this command.",
)
)
return
# Deny usage if the bot is not run in the LPWB bot channel to prevent information leak.
if ctx.message.channel not in self.BOT_WORK:
await ctx.send(
embed=discord.Embed(
description=f"Bruh what do you think {self.LLAMA_BOT.mention} is for?"
)
)
return
field = field.lower()
available_fields = ["uid", "weapon", "time", "server"]
if field not in available_fields:
raise discord.ext.commands.errors.BadArgument(
"first command argument should be one of:\n"
+ "\n- ".join(available_fields)
)
user_exists_in_firestore = self.bot.llama_firebase.exists(
"players", ctx.message.author.id
)
if field == "uid":
original_content = "checking uid validity..."
msg = await ctx.send(embed=discord.Embed(description=original_content))
try:
wbscraper.player.get_player(args[0]) # inefficient but effective
except Exception:
err_msg = f"{original_content}\nuid not valid. Aborting."
await msg.edit(embed=discord.Embed(description=err_msg))
raise discord.ext.commands.errors.BadArgument(err_msg)
if user_exists_in_firestore:
original_content += f"\nuid is valid. Updating uid..."
complete_msg = "Updated uid!"
else:
original_content += f"\nuid is valid. Adding user to database..."
complete_msg = "New player registered!"
await msg.edit(embed=discord.Embed(description=original_content))
self.bot.llama_firebase.create(
"players", ctx.message.author.id, "uid", args[0]
)
await msg.edit(
embed=discord.Embed(description=f"{original_content}\n{complete_msg}")
)
return
if user_exists_in_firestore:
updated_active = False
updated_player_list = False
if field in ["weapon", "time"]:
self.bot.llama_firebase.write(
"players", ctx.message.author.id, field, " ".join(args)
)
await self.update_player(ctx.message.author.id)
updated_player_list = True
elif field == "server":
if all(
i in self.WB_GAME_SERVERS for i in args
): # if all the inputs is a valid server
self.bot.llama_firebase.write(
"players",
ctx.message.author.id,
"server",
",".join(list(dict.fromkeys(args))),
) # Not doing join(args) to remove duplicate inputs
await self.update_active()
updated_active = True
else:
err_message = f"""
1 - List of available servers: {', '.join("`{0}`".format(w) for w in self.WB_GAME_SERVERS)} (case sensitive)
2 - servers should be separated with spaces (not commas)
3 - you can choose multiple servers
ex: `-set server ASIA USA`"""
await ctx.send(embed=discord.Embed(description=err_message))
raise discord.ext.commands.errors.BadArgument(err_message)
description = "Updated %s!"
if updated_active and not updated_player_list:
description = description % f"<#{self.bot.VARS['channels']['ACTIVE']}>"
elif updated_player_list and not updated_active:
description = (
description
% f"<#{self.bot.VARS['channels']['LLAMAS_AND_PYJAMAS_INFO']}>"
)
elif updated_active and updated_player_list:
description = (
description
% f"<#{self.bot.VARS['channels']['ACTIVE']}> and <#{self.bot.VARS['channels']['LLAMAS_AND_PYJAMAS_INFO']}>"
)
await ctx.send(embed=discord.Embed(description=description))
else:
await ctx.send(
embed=discord.Embed(
description=f"You'll have to register first. Type `{self.bot.command_prefix}help set` to get more info"
)
)
@commands.command(
help="Removes data from the database.",
usage="""> {prefix}{command} *<data to remove>
data to remove: weapon, time, server
Removing yourself from active roster and members list is not available yet.
ex:
Removes time and weapon data from the database
> {prefix}{commands} time weapon
""",
)
async def rm(self, ctx, *fields):
# checks if the user is authorized to use this command
if not util.lists_has_intersection(
self.bot.LLAMA_PERMS, ctx.message.author.roles
):
await ctx.send(
embed=discord.Embed(
description=f"Ew! non WB squad member peasant! *spits at {ctx.message.author.mention}*"
)
)
return
# check if the command is run in the right channels
if ctx.message.channel not in self.BOT_WORK:
await ctx.send(
embed=discord.Embed(
description=f"Do it in {self.LLAMA_BOT.mention} dumdum."
)
)
return
if not self.bot.llama_firebase.exists("players", ctx.message.author.id):
await ctx.send(
embed=discord.Embed(
title="User not found in the database",
description="You didn't even register. There's nothing to remove.",
)
)
return
fields = [str(field).lower() for field in fields]
if util.lists_has_intersection(fields, ["weapon", "time", "server"]):
fields_that_does_not_exist = []
for field in fields:
try:
self.bot.llama_firebase.read("players", ctx.message.author.id)[
field
]
except KeyError:
fields_that_does_not_exist.append(field)
if fields_that_does_not_exist:
description = f"field %s is not set or already deleted"
if len(fields_that_does_not_exist) > 1:
description = f"fields %s are not set or already deleted"
await ctx.send(
embed=discord.Embed(
description=description % "/".join(fields_that_does_not_exist)
)
)
for field in fields:
self.bot.llama_firebase.delete("players", ctx.message.author.id, field)
fields_removed = list(set(fields) - set(fields_that_does_not_exist))
if fields_removed:
description = f"field %s removed from `{ctx.message.author.name}`"
if len(fields_removed) > 1:
description = f"fields %s removed from `{ctx.message.author.name}`"
await ctx.send(
embed=discord.Embed(
description=description % "/".join(fields_removed)
)
)
def setup(bot):
bot.add_cog(WarBrokers(bot))

View file

@ -1,158 +0,0 @@
from llama import Llama
from . import _util as util
import discord
from discord.ext import commands
import sys
from time import time
from datetime import timedelta, datetime
class Core(commands.Cog):
def __init__(self, bot):
self.bot: Llama = bot
# remove any potential existing help command to prevent collision
self.bot.remove_command("help")
async def cog_check(self, ctx: commands.Context):
if exception_or_bool := await util.on_pm(ctx.message, self.bot):
raise exception_or_bool
return not exception_or_bool
@commands.command(
help="Shows very basic information about the bot.",
)
async def about(self, ctx):
uptime = str(timedelta(seconds=int(round(time() - self.bot.start_time))))
bot_created_time = datetime.utcfromtimestamp(
((int(self.bot.user.id) >> 22) + 1420070400000) / 1000
)
bot_created_delta = datetime.now() - bot_created_time
await ctx.send(
embed=discord.Embed(title="About llama bot")
.set_thumbnail(url=self.bot.user.avatar_url)
.add_field(
name="Python version",
value=sys.version.split()[0],
)
.add_field(
name="Discord.py version",
value=discord.__version__,
)
.add_field(name="Bot ID", value=f"{self.bot.user.id}")
.add_field(
name="Created in (UTC)",
value=f"{bot_created_time.strftime('%Y-%m-%d %H:%M:%S')} ({bot_created_delta.days} days ago)",
)
.add_field(name="Uptime", value=uptime, inline=False)
.add_field(
name="Source Code",
value="https://github.com/developomp/llama-bot",
inline=False,
)
)
@commands.command(
aliases=[
"h",
],
help="Shows list of helpful information about a command or a cog.",
usage="""> {prefix}{command} <cog | command | None>
ex:
List cogs:
> {prefix}{command}
List commands in the `core` cog:
> {prefix}{command} core
Shows info about `ping` command:
> {prefix}{command} ping""",
)
async def help(self, ctx, cog_str=None):
if not cog_str: # when no argument is passed
help_embed = discord.Embed(
title="Help",
description=f"Use `{self.bot.command_prefix}help <cog>` command to get more information on a cog. (case insensitive)",
)
for cog_name in self.bot.cogs:
# Show command to get help for a cog
help_embed.add_field(
name=cog_name,
value=f"`{self.bot.command_prefix}help {cog_name.lower()}`",
)
for cog_name in self.bot.cogs:
# fields that will be shown in main help embed
try:
for field in self.bot.get_cog(cog_name).main_help_fields:
help_embed.add_field(
name=field[0], value=field[1], inline=False
)
except AttributeError:
# cog doesn't have main_help_fields
pass
await ctx.send(embed=help_embed)
else: # when searching for a specific cog
cogs = list(self.bot.cogs.keys())
lower_cogs = [cog.lower() for cog in cogs]
lower_cog_str = cog_str.lower() # lower case string of cog
if lower_cog_str in lower_cogs: # when a cog has been found
cog = self.bot.get_cog(cogs[lower_cogs.index(lower_cog_str)])
await ctx.send(
embed=discord.Embed(
title=f'"{lower_cog_str}" commands',
description=f"Use `{self.bot.command_prefix}help <command>` to get more information about a command. (case insensitive)\n\n"
# keep cog_help empty if cog.help_msg does not exist
+ (cog.help_msg + ("\n\n" if cog.help_msg else ""))
if hasattr(cog, "help_msg")
else ""
+ f"**Commands:**\n `{'`, `'.join([command.name for command in cog.get_commands()])}`",
)
)
else: # when a cog has not been found
command: discord.ext.commands.Command = discord.utils.get(
self.bot.commands, name=cog_str
)
if not command: # when command was not found
await ctx.send(
embed=discord.Embed(
description=f"Cannot find cog/command **{cog_str}**.\nUse the `{self.bot.command_prefix}help` command to list all enabled cogs."
)
)
else: # when a command has been found
await ctx.send(
embed=discord.Embed(
title=f"{self.bot.command_prefix}{command.name}",
description=(
f"**Aliases:** `{'`, `'.join(command.aliases)}`"
if command.aliases
else ""
)
+ "\n\n**Description:**\n"
+ (
command.help
if command.help
else "Under construction..."
)
+ "\n\n**Usage:**\n"
+ (
command.usage
if command.usage
else "> {prefix}{command}"
).format(
prefix=self.bot.command_prefix, command=command.name
),
)
)
def setup(bot):
bot.add_cog(Core(bot))

View file

@ -1,32 +0,0 @@
from llama import Llama
from . import _util as util
import discord
from discord.ext import commands
class DM(commands.Cog):
def __init__(self, bot):
self.bot: Llama = bot
self.emojis: set[discord.PartialEmoji] = set()
for emoji_raw in self.bot.VARS["dm"]["emojis"]:
self.emojis.add(util.convert_to_partial_emoji(emoji_raw, self.bot))
@commands.Cog.listener()
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent):
if payload.guild_id or (payload.emoji not in self.emojis):
return
user: discord.User = await self.bot.fetch_user(payload.user_id)
if not user.dm_channel:
await user.create_dm()
channel: discord.DMChannel = user.dm_channel
message: discord.Message = await channel.fetch_message(payload.message_id)
if message.author == self.bot.user:
await message.delete()
def setup(bot):
bot.add_cog(DM(bot))

View file

@ -1,223 +0,0 @@
from llama import Llama
from . import _util as util
import discord
from discord.ext import commands
import nekos
import random
class Fun(commands.Cog):
def __init__(self, bot):
self.bot: Llama = bot
self.quotes = self.bot.VARS["settings"]["quotes"]
random.shuffle(self.quotes)
self.quote_index = 0
async def cog_check(self, ctx: commands.Context):
if exception_or_bool := await util.on_pm(ctx.message, self.bot):
raise exception_or_bool
return not exception_or_bool
@commands.command(
aliases=[
"quote",
],
help="Shows a random llama quote.",
)
async def llama(self, ctx):
# the reason why I don't use random.choice is because it may give two of the same result consecutively.
quote = self.quotes[self.quote_index]
await ctx.send(
embed=discord.Embed(
title="Llama quote that'll make your day", description=quote
)
)
self.quote_index += 1 # get next quote next time this command is called
if self.quote_index > (len(self.quotes) - 1): # reshuffle and reset index
random.shuffle(self.quotes)
self.quote_index = 0
@commands.command(
aliases=[
"pp",
],
help="""Detects user's penis length and arranges them from largest to smallest.
This is 101% accurate.""",
usage="""> {prefix}{command} *<user>
ex:
Show penis length of <@501277805540147220> and <@641574882382970891>
> {prefix}{command} <@501277805540147220> <@641574882382970891>
""",
)
async def penis(self, ctx, *users: discord.Member):
dongs = {}
msg = ""
state = random.getstate()
for user in users:
random.seed(user.id)
random_size = random.randint(0, 30)
dongs[user] = ["8{}D".format("=" * random_size), random_size]
if not users:
random.seed(ctx.author.id)
random_size = random.randint(0, 30)
dongs[ctx.author] = ["8{}D".format("=" * random_size), random_size]
random.setstate(state)
dongs = sorted(dongs.items(), key=lambda x: x[1][1])
for user, dong in dongs:
msg += "**%s's size: ( %s )**\n%s\n" % (user.mention, dong[1], dong[0])
await ctx.send(
embed=discord.Embed(title="Here's your pp list", description=msg)
)
@commands.command(
help="Call for help.",
)
async def fix(self, ctx: discord.ext.commands.Context):
"""
Bug report feature.
Removed due to excessive pinging
old code:
await ctx.send(f"Yo {', '.join([f'<@{fixer_id}>' for fixer_id in self.bot.fixer_ids])} fix this shit")
"""
await ctx.send(
f"Fuck you {ctx.message.author.mention} <:coronalol:692323993419776020>"
)
@commands.command(
help="Shows image matching search term",
usage="""> {prefix}{command} <target>
Can chose from:
**NSFW**:
`feet`, `yuri`, `trap`, `futanari`, `hololewd`, `lewdkemo`, `solog`, `feetg`, `cum`, `erokemo`, `les`, `wallpaper`, `lewdk`, `tickle`, `lewd`, `eroyuri`, `eron`, `cum_jpg`, `bj`, 'nsfw_neko_gif', `solo`, `kemonomimi`, `nsfw_avatar`, `gasm`, `anal`, `hentai`, `avatar`, `erofeet`, `holo`, `keta`, `blowjob`, `pussy`, `tits`, `holoero`, `pussy_jpg`, `pwankg`, `classic`, `kuni`, `kiss`, `femdom`, `neko`, `spank`, `erok`, `boobs`, `random_hentai_gif`, `smallboobs`, `ero`
**non NSFW**:
`feed`, `gecg`, `poke`, `slap`, `lizard`, `waifu`, `pat`, `8ball`, `cuddle`, `fox_girl`, `hug`, `smug`, `goose`, `baka`, `woof`
ex:
> {prefix}{command} neko""",
)
async def img(self, ctx, target: str):
non_nsfw = [
"feed",
"gecg",
"poke",
"slap",
"lizard",
"waifu",
"pat",
"8ball",
"cuddle",
"fox_girl",
"hug",
"smug",
"goose",
"baka",
"woof",
]
nsfw = [
"feet",
"yuri",
"trap",
"futanari",
"hololewd",
"lewdkemo",
"solog",
"feetg",
"cum",
"erokemo",
"les",
"wallpaper",
"lewdk",
"tickle",
"lewd",
"eroyuri",
"eron",
"cum_jpg",
"bj",
"nsfw_neko_gif",
"solo",
"kemonomimi",
"nsfw_avatar",
"gasm",
"anal",
"hentai",
"avatar",
"erofeet",
"holo",
"keta",
"blowjob",
"pussy",
"tits",
"holoero",
"pussy_jpg",
"pwankg",
"classic",
"kuni",
"kiss",
"femdom",
"neko",
"spank",
"erok",
"boobs",
"random_hentai_gif",
"smallboobs",
"ero",
]
in_nsfw = target in nsfw
in_non_nsfw = target in non_nsfw
if not (in_nsfw or in_non_nsfw):
await ctx.send(
embed=discord.Embed(
title="Oops",
description="Image category you're looking for doesn't exist. Here's all we got:",
)
.add_field(name="NSFW", value=f"`{'`, `'.join(nsfw)}`", inline=False)
.add_field(
name="non NSFW", value=f"`{'`, `'.join(non_nsfw)}`", inline=False
)
)
return
if in_nsfw and not ctx.message.channel.is_nsfw():
raise discord.ext.commands.errors.NSFWChannelRequired
image_url = nekos.img(target.lower())
await ctx.send(
embed=discord.Embed(
title="Image",
description=f"requested by: {ctx.message.author.mention}\n**[Link if you don't see the image]({image_url})**",
)
.set_image(url=image_url)
.set_footer(text=f"powered by nekos.life")
)
@commands.command(
help="Shows useless facts.",
)
async def fact(self, ctx):
await ctx.send(
embed=discord.Embed(title="Fact of the day", description=nekos.fact())
)
@commands.command(
aliases=[
"clapify",
],
help="does the karen clap thing. Does not work with external emojis",
)
async def clap(self, ctx, *args):
await ctx.send(" :clap: ".join(args))
def setup(bot):
bot.add_cog(Fun(bot))

View file

@ -1,458 +0,0 @@
from llama import Llama
from . import _util as util
import discord
from discord.ext import commands
import re
import datetime
import operator
import traceback
from threading import Timer
class Pinning(commands.Cog):
def __init__(self, bot):
self.bot: Llama = bot
# Emojis that will be used to pin a message
self.pin_emojis: set[discord.PartialEmoji] = set()
# A set of (channel_id, message_id) tuple that are waiting for pin cooldown.
# Messages in this list can't be pinned.
# This is here to prevent pin spamming or accidental double pin
self.recently_pinned_messages: set[(int, int)] = set()
# message pin cooldown per message measured in seconds
self.pin_cooldown: float = 10.0
# A set of (source_channel_id, destination_channel_id) tuple where the pins will be mapped.
self.channel_maps: set[(int, int)] = self.channel_maps_read()
# A set of discord text channels with reaction pinning enabled.
# not using channel ids and position because it makes the code messier without providing much performace benefits.
self.pinnable_channels: set[discord.TextChannel] = self.pinnable_channels_read()
# emoji_raw is either unicode string or int (emoji name or emoji id)
for emoji_raw in self.bot.VARS["pinbot"]["pin_reaction"]:
try:
self.pin_emojis.add(util.convert_to_partial_emoji(emoji_raw, self.bot))
except Exception:
traceback.print_exc()
self.help_msg = ""
self.main_help_fields = [
[
"Pinning",
f"""Add one of these reactions to pin a message: {' | '.join(map(str, self.pin_emojis))}
You'll need at least one of the following roles to use this feature: {' | '.join([role.mention for role in self.bot.PIN_PERMISSIONS])}""",
],
]
async def cog_check(self, ctx: commands.Context):
if exception_or_bool := await util.on_pm(ctx.message, self.bot):
raise exception_or_bool
return not exception_or_bool
def channel_maps_read(self) -> set[(int, int)]:
res: set[(int, int)] = set()
self.bot.VARS["pinbot"] = self.bot.llama_firebase.read("vars", "pinbot")
for map_str in self.bot.VARS["pinbot"]["maps"]:
try:
map_ids = map_str.split(",")
assert len(map_ids) == 2
source_channel_id, destination_channel_id = map(int, map_ids)
source_channel = self.bot.get_channel(source_channel_id)
destination_channel = self.bot.get_channel(destination_channel_id)
assert source_channel
assert destination_channel
res.add((source_channel, destination_channel))
except (AssertionError, ValueError):
traceback.print_exc()
return res
def channel_maps_write(self):
self.bot.llama_firebase.write(
"vars",
"pinbot",
"maps",
["%s,%s" % (i[0].id, i[1].id) for i in self.channel_maps],
)
def pinnable_channels_read(self) -> set[int]:
res: set[discord.TextChannel] = set()
self.bot.VARS["pinbot"] = self.bot.llama_firebase.read("vars", "pinbot")
for channel_id in self.bot.VARS["pinbot"]["allowed_channels"]:
try:
channel: discord.TextChannel = self.bot.get_channel(int(channel_id))
assert channel, f"channel {channel_id} does not exist in the LP server"
res.add(channel)
except (AssertionError, ValueError):
traceback.print_exc()
return res
def pinnable_channels_write(self):
self.bot.llama_firebase.write(
"vars",
"pinbot",
"allowed_channels",
["%s" % channel.id for channel in self.pinnable_channels],
)
async def message2embed(self, ctx: commands.Context, message: discord.Message):
# todo: check if suppress_embeds flag is set
has_video = False
new_embed = discord.Embed()
new_embed.description = message.content[:1900] + (
"..." if len(message.content) > 1900 else ""
)
new_embed.add_field(
name="pin info",
value=f"channel: {message.channel.mention}\n[Jump to message]({message.jump_url})",
inline=False,
)
if message.embeds:
original_embed = message.embeds[0]
if not original_embed.video:
new_embed.title = original_embed.title
new_embed.colour = original_embed.colour
new_embed.url = original_embed.url
if original_embed.description:
new_embed.add_field(
name="content", value=original_embed.description, inline=False
)
for field in original_embed.fields:
new_embed.add_field(
name=field.name, value=field.value, inline=field.inline
)
if original_embed.image:
new_embed.set_image(url=original_embed.image.url)
if original_embed.thumbnail:
new_embed.set_thumbnail(url=original_embed.thumbnail.url)
if original_embed.footer:
new_embed.set_footer(
text=original_embed.footer.text,
icon_url=original_embed.footer.icon_url,
)
else:
has_video = True
files = [
await attachment.to_file(spoiler=attachment.is_spoiler())
for attachment in message.attachments
]
if has_video:
await ctx.send("\n".join(util.url_from_str(message.content)), files=files)
else:
await ctx.send(embed=new_embed, files=files)
# reaction pinning
@commands.Cog.listener()
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent):
if payload.emoji not in self.pin_emojis:
return
channel: discord.TextChannel = self.bot.get_channel(payload.channel_id)
message: discord.Message = await channel.fetch_message(payload.message_id)
if channel not in self.pinnable_channels:
return
channel_message_tuple: tuple(int, int) = (channel.id, message.id)
if channel_message_tuple in self.recently_pinned_messages:
await channel.send(
embed=discord.Embed(
description=f"waiting for {self.pin_cooldown} second message pin cooldown!"
)
)
return
original_message_content = f"pinning [a message]({message.jump_url}) as requested by: {payload.member.mention}"
original_message = await channel.send(
embed=discord.Embed(description=original_message_content)
)
# if the user has any role with pinning permission
if not util.lists_has_intersection(
self.bot.PIN_PERMISSIONS, payload.member.roles
):
await original_message.edit(
embed=discord.Embed(
description=original_message_content
+ f"\n:exclamation: FAILED\nTo pin a message, you need at lest one of the following roles:\n-"
+ ("\n-".join([role.mention for role in self.bot.PIN_PERMISSIONS]))
)
)
return
if message.pinned:
await original_message.edit(
embed=discord.Embed(
description=original_message_content + "\n:pushpin: Already pinned!"
)
)
return
try:
await message.pin()
except Exception as err:
err_type = type(err)
why_fail = "Unknown reason"
if err_type == discord.Forbidden:
why_fail = "Bot does not have permission"
elif err_type == discord.NotFound:
why_fail = "message not found"
elif err_type == discord.HTTPException:
# channel as more then 50 pins, message is a system message, etc.
pass
await original_message.edit(
embed=discord.Embed(
description=original_message_content
+ f"\n:exclamation: FAILED ({why_fail}) :exclamation:"
)
)
return
self.recently_pinned_messages.add(channel_message_tuple)
Timer(
self.pin_cooldown,
lambda: self.recently_pinned_messages.remove(channel_message_tuple),
).start()
await original_message.edit(
embed=discord.Embed(description=original_message_content + "\nsuccess!")
)
# pin archiving
@commands.Cog.listener()
async def on_guild_channel_pins_update(
self, channel: discord.abc.GuildChannel, last_pin: datetime.datetime
):
# print(channel, last_pin)
pass
@commands.command(
aliases=[
"rp",
],
help="Enables or Disables reaction pinning for given channels",
usage="""> {prefix}{command} <enable|disable> *<channel ID|channel mention>
ex:
- Enabling reaction pinning in <#761546616036524063> and <#764367748125949952>
> {prefix}{command} enable <#761546616036524063> <#764367748125949952>
or
> {prefix}{command} e <#761546616036524063> <#764367748125949952>
- Disabling reaction pinning for <#761546616036524063> and <#764367748125949952>
> {prefix}{command} disable <#761546616036524063> <#764367748125949952>
or
> {prefix}{command} d <#761546616036524063> <#764367748125949952>""",
)
@util.must_be_admin()
async def reactionpin(
self, ctx: commands.Context, operation: str, *raw_channels: str
):
channels: set[int] = set()
if not raw_channels:
await ctx.send(
embed=discord.Embed(
title="No channels were given",
description="No channels were given as input. Aborting.",
)
)
return
for channel_str in raw_channels:
try:
channel: discord.channel.TextChannel = self.bot.LP_SERVER.get_channel(
int(re.findall(r"\d+", channel_str)[0])
)
assert channel
channels.add(channel)
except (ValueError, IndexError, AssertionError):
await ctx.send(
embed=discord.Embed(
title="Oops!",
description=f"{channel_str} (`{channel_str}`) is not a valid channel.",
)
)
return
def channels_to_mentions():
return "\n-".join([channel.mention for channel in channels])
if operation in ["enable", "e"]:
self.pinnable_channels.update(channels)
self.pinnable_channels_write()
await ctx.send(
embed=discord.Embed(
title="Enabled Reaction pinning!",
description=f"People can now react in:\n-{channels_to_mentions()}\n with {' '.join(map(str, self.pin_emojis))} to pin a message!",
)
)
elif operation in ["disable", "d"]:
self.pinnable_channels.difference_update(channels)
self.pinnable_channels_write()
await ctx.send(
embed=discord.Embed(
title="Disabled Reaction pinning!",
description=f"Adding reactions to messages in \n-{channels_to_mentions()}\n will **NOT** pin them now.",
)
)
else:
raise discord.ext.commands.ArgumentParsingError
# not in reactionpin because it requires admin permission to run
@commands.command(
help="Shows channels with reaction pinning enabled.",
)
async def pinnable(self, ctx: commands.Context):
# todo: fix channels with no category not ordered properly
# sort in order they're arranged in discord (sort by "position" attribute)
sorted_channels: list[discord.abc.GuildChannel] = sorted(
self.pinnable_channels, key=operator.attrgetter("position")
)
description: str = ""
previous_channel_category_id: int = int()
previous_channel_had_category: bool = bool()
for channel in sorted_channels:
if channel.category_id:
previous_channel_had_category = True
if previous_channel_category_id != channel.category_id:
previous_channel_category_id = channel.category_id
description += f"\n**{channel.category.name.upper()}**\n"
description += f" - {channel.mention}\n"
else:
if previous_channel_had_category:
description += f"\n"
previous_channel_had_category = False
description += f"- {channel.mention}\n"
if not sorted_channels:
description = "No channels have reaction pinning enabled yet."
await ctx.send(
embed=discord.Embed(
title="Channels with reaction pinning enabled", description=description
)
)
@commands.command(
help="Creates or removes pin mapping from source channel to destination channel.",
usage="""> {prefix}{command} <operation: create | c | destroy | d> <source_channel> <destination_channel>
ex:
- Using channel mentions
> {prefix}{command} create <#761546616036524063> <#764029864033779732>
> {prefix}{command} destroy <#761546616036524063> <#764029864033779732>
- Using channel ids
> {prefix}{command} create 761546616036524063 764029864033779732
> {prefix}{command} destroy 761546616036524063 764029864033779732""",
)
@util.must_be_admin()
async def map(
self,
ctx: commands.Context,
operation: str,
source_channel_raw: str,
destination_channel_raw: str,
):
op1 = ["create", "c"]
op2 = ["destroy", "d"]
operation_mode = 0 # 0: None, 1: Create, 2: Destroy
if operation in op1:
operation_mode = 1
if operation in op2:
operation_mode = 2
if not operation_mode:
raise discord.ext.commands.errors.BadArgument
try:
source_channel = self.bot.LP_SERVER.get_channel(
int(re.findall(r"\d+", source_channel_raw)[0])
)
destination_channel = self.bot.LP_SERVER.get_channel(
int(re.findall(r"\d+", destination_channel_raw)[0])
)
except (ValueError, IndexError):
raise discord.ext.commands.errors.BadArgument
if not (source_channel or destination_channel):
await ctx.send(
embed=discord.Embed(
title="Smh",
description="That source and destination channel doesn't exist",
)
)
return
if not source_channel:
await ctx.send(
embed=discord.Embed(
title="Oopsies!", description="That source channel doesn't exist!"
)
)
return
if not destination_channel:
await ctx.send(
embed=discord.Embed(
title="Nope!", description="That destination channel doesn't exist!"
)
)
return
new = (source_channel, destination_channel)
new_map = f"{source_channel.mention}{destination_channel.mention}"
if operation_mode == 1:
if new in self.channel_maps:
await ctx.send(
embed=discord.Embed(
title="Hmmm", description=f"map\n{new_map}\nalready exists"
)
)
return
self.channel_maps.add(new)
await ctx.send(
embed=discord.Embed(title="Pin map registered!", description=new_map)
)
elif operation_mode == 2:
if new not in self.channel_maps:
await ctx.send(
embed=discord.Embed(
title="Wait", description=f"map\n{new_map}\nalready removed"
)
)
return
self.channel_maps.discard(new)
await ctx.send(
embed=discord.Embed(title="Pin map removed!", description=new_map)
)
self.channel_maps_write()
@commands.command(help="Shows list of pin source and destination channels.")
async def maps(self, ctx: commands.Context):
# todo: order channels
description = ""
if self.channel_maps:
for source_channel, destination_channel in self.channel_maps:
description += (
f"{source_channel.mention}{destination_channel.mention}\n"
)
else:
description = "No maps yet."
await ctx.send(embed=discord.Embed(title="Pin maps", description=description))
def setup(bot: commands.Bot):
bot.add_cog(Pinning(bot))

View file

@ -1,59 +0,0 @@
from llama import Llama
from . import _util as util
import discord
from discord.ext import commands
from datetime import datetime
class Util(commands.Cog):
def __init__(self, bot):
self.bot: Llama = bot
async def cog_check(self, ctx: commands.Context):
if exception_or_bool := await util.on_pm(ctx.message, self.bot):
raise exception_or_bool
return not exception_or_bool
@commands.command(
help="Calculates of when a discord ID (aka snowflake) was created.",
usage="""> {prefix}{command} <snowflake>
ex:
> {prefix}{command} 501277805540147220""",
)
async def snowflake(self, ctx, snowflake_to_parse):
try:
datetime_data = util.snowflake_to_datetime(snowflake_to_parse).strftime(
"%Y-%m-%d %H:%M:%S"
)
except ValueError:
await ctx.send(
embed=discord.Embed(
description="Input must be a valid discord snowflake (i.e. a number)"
)
)
return
await ctx.send(
embed=discord.Embed(
description=f"snowflake `{snowflake_to_parse}` was created in: `{datetime_data}`"
)
)
@commands.command(
help="Measures communication delay (latency) in 1/1000 of a second, also known as millisecond (ms).",
)
async def ping(self, ctx):
message_latency = int(
(datetime.now() - ctx.message.created_at).microseconds / 1000
)
embed = (
discord.Embed(description="**TR1GGERED**")
.add_field(name="Message latency", value=f"{message_latency}ms")
.add_field(name="API latency", value=f"{int(self.bot.latency * 1000)}ms")
)
await ctx.send(embed=embed)
def setup(bot):
bot.add_cog(Util(bot))

View file

@ -1,223 +0,0 @@
from llama_firebase import LlamaFirebase
import cogs._util as util
import discord
from discord.ext import commands
from os import listdir, path
from time import time
from typing import Union
import traceback
import json
def resolve_path(relative_path: str):
"""
Converts relative path to absolute path for when the bot was executed in arbitrary path
"""
return path.abspath(path.join(path.dirname(__file__), relative_path))
class Llama(commands.Bot):
# the server the bot is intended to run on. It's useful to have this reference available.
LP_SERVER: discord.Guild
server_id: int = 457373827073048604
# IDs of users who can run owners only commands
owner_id: int = 501277805540147220
owner_ids: set = {owner_id, 396333737148678165}
def __init__(self, firebase_cred_path: str, prefix: str = "-"):
super().__init__(
help_command=None, # to overwrite with custom help command
command_prefix=prefix,
case_insensitive=True, # allow mix of lower cae and upper case for commands
)
# create firestore interface
self.llama_firebase: LlamaFirebase = LlamaFirebase(firebase_cred_path)
# read all configs and data for better responsiveness
self.VARS = self.llama_firebase.read_collection("vars")
# ----- [ DISCORD.PY STUFF ] -----
async def on_ready(self):
"""
variables in this function are checked if they exist before getting defined (reading from firebase which takes time)
because the function on_ready() can be called multiple times as mentined in the discord.py documentaton:
https://discordpy.readthedocs.io/en/latest/api.html#discord.on_ready
"""
# Prevents bot from running in server other than LP's
if not hasattr(self, "LP_SERVER"):
self.LP_SERVER: discord.Guild = next(
(guild for guild in self.guilds if guild.id == self.server_id), None
)
if not self.LP_SERVER:
print("----------[ The bot is not in LP server! ]----------")
exit(69)
# load roles
if not hasattr(self, "self.HIGHEST_ORDER"):
self.HIGHEST_ORDER = self.get_role_from_vars("HIGHEST_ORDER")
if not hasattr(self, "self.LPWB_MEMBER"):
self.LPWB_MEMBER = self.get_role_from_vars("LPWB_MEMBER")
if not hasattr(self, "self.SILK_PERMISSION"):
self.SILK_PERMISSION = self.get_role_from_vars("SILK_PERMISSION")
if not hasattr(self, "self.PYJAMAS"):
self.PYJAMAS = self.get_role_from_vars("PYJAMAS")
# define roles with access to special features
self.LLAMA_PERMS = [
getattr(self, i) for i in self.VARS["settings"]["LLAMA_PERM"]
]
self.PIN_PERMISSIONS = [
getattr(self, i) for i in self.VARS["settings"]["PIN_PERM"]
]
# load cogs at the very last moment as some of them require data from the database
# load all cogs that do not begin with a underscore
cogs_dir = resolve_path("./cogs")
for cog in [
f"cogs.{path.splitext(f)[0]}"
for f in listdir(cogs_dir)
if path.isfile(path.join(cogs_dir, f)) and not f[0] == "_"
]:
print(f"loading cog: {cog}")
try:
self.load_extension(cog)
except commands.ExtensionAlreadyLoaded:
print(f"Extension {cog} was already loaded. Skipping.")
# to show bot uptime
if not hasattr(self, "start_time"):
self.start_time = time()
print(f"{self.user} is up and ready!")
async def on_command_error(
self,
ctx: commands.Context,
error: commands.CommandError,
):
"""
Gets executed when the bot encounters an error.
"""
# ignore error if it's simply a missing command
if isinstance(error, commands.errors.CommandNotFound):
return
err_title: str = f"Error ({type(error).__name__})"
err_description: str = ""
if isinstance(error, util.NotAdminChannel):
err_title = ":lock: Not in admin channel"
err_description = f"This command can only be called in admins channels."
if isinstance(error, commands.errors.NSFWChannelRequired):
err_title = ":lock: Not in nsfw channel"
err_description = "This command can only be called in NSFW channels."
if isinstance(error, commands.errors.NotOwner):
err_title = f":lock: Not a bot owner"
err_description = "Only the bot owner(s) can call that command."
if isinstance(error, commands.errors.BotMissingPermissions):
missing_perms = "".join([f"- {i}\n" for i in error.missing_perms])
err_title = ":exclamation: Missing Permission(s)"
err_description = f"missing the following permissions:\n{missing_perms}"
if isinstance(error, commands.errors.MissingRequiredArgument):
err_title = f":exclamation: Missing required argument(s)"
err_description = f"Consider using the `{self.command_prefix}help {ctx.command}` command to learn how to use it."
if isinstance(
error,
(
commands.errors.BadArgument,
commands.ArgumentParsingError,
),
):
err_title = f":exclamation: Invalid argument(s)"
err_description = f"`Consider using the `{self.command_prefix}help {ctx.command}` command to learn how to use it."
if isinstance(error, commands.errors.MemberNotFound):
err_title = f":exclamation: Member not found"
err_description = f"Member {error.argument} was not found in this server."
# When command failed to complete
if isinstance(error, commands.errors.CommandInvokeError):
err_title = ":exclamation: Command Failed to complete"
err_description = "Encountered an unknown error."
await ctx.send(
embed=discord.Embed(
title=err_title,
description=err_description,
)
.add_field(name="Channel", value=ctx.message.channel.mention)
.add_field(name="Author", value=ctx.author.mention)
.add_field(name="Message", value=f"[Message URL]({ctx.message.jump_url})")
.add_field(name="Content", value=ctx.message.content[:1024], inline=False)
)
# Log details in terminal
print("")
print("=" * 30)
print(type(error))
print("Cog:", ctx.cog)
print("Author:", ctx.author, ctx.author.id)
print("Content:", ctx.message.content)
print("Channel:", ctx.message.channel, ctx.message.channel.id)
print("URL:", ctx.message.jump_url)
print("")
traceback.print_exception(type(error), error, error.__traceback__)
print("=" * 30)
print("")
# ----- [ BOT METHODS ] -----
def get_role_from_vars(self, name) -> Union[discord.Role, None]:
"""
Get discord role by name
"""
return discord.utils.get(self.LP_SERVER.roles, id=int(self.VARS["roles"][name]))
def get_channel_from_vars(self, name) -> discord.abc.GuildChannel:
"""
Get discord channel by name
"""
return self.LP_SERVER.get_channel(int(self.VARS["channels"][name]))
def main():
try:
with open(resolve_path("./config.json"), "rt") as f:
config = json.loads(f.read())
except FileNotFoundError:
# ignore if config.json does not exist
pass
with open(resolve_path("./secrets/secret.json"), "rt") as f:
secret = json.loads(f.read())
# set default prefix if it exists
prefix = (
(config["prefix"] if "prefix" in config else None)
if "config" in locals()
else None
)
firebase_path = resolve_path("./secrets/firebase-adminsdk.json")
llama_bot = Llama(firebase_path, prefix) if prefix else Llama(firebase_path)
llama_bot.run(secret["token"])
if __name__ == "__main__":
main()

View file

@ -1,62 +0,0 @@
import firebase_admin
import firebase_admin.firestore
class LlamaFirebase:
"""
A simple firebase firestore interface I made for the bot.
More information about firestore can be found here:
https://firebase.google.com/docs/firestore
"""
def __init__(self, certificate_path):
cred = firebase_admin.credentials.Certificate(certificate_path)
firebase_admin.initialize_app(cred, {"projectId": cred.project_id})
self.db = firebase_admin.firestore.client()
def read_collection(self, collection_name):
return {
doc.id: doc.to_dict()
for doc in self.db.collection(u"%s" % collection_name).stream()
}
def create(self, collection_name, document_name, data_name, data):
return (
self.db.collection(u"%s" % collection_name)
.document(u"%s" % document_name)
.set({"%s" % data_name: u"%s" % data}, merge=True)
)
def read(self, collection_name, document_name):
return (
self.db.collection(u"%s" % collection_name)
.document(u"%s" % document_name)
.get()
.to_dict()
)
def delete(self, collection_name, document_name, data_name):
return (
self.db.collection(u"%s" % collection_name)
.document(u"%s" % document_name)
.update({u"%s" % data_name: firebase_admin.firestore.DELETE_FIELD})
)
def exists(self, collection_name, discord_id):
return (
True
if self.db.collection(u"%s" % collection_name)
.document(u"%s" % discord_id)
.get()
.exists
else False
)
def write(self, collection_name, document_name, data_name, data, merge=True):
if not isinstance(data, list):
data = u"%s" % data
return (
self.db.collection(u"%s" % collection_name)
.document(u"%s" % document_name)
.set({u"%s" % data_name: data}, merge=merge)
)

View file

@ -1,4 +0,0 @@
discord.py
firebase_admin
nekos.py
Pillow