Amicobot
BIN
__pycache__/exchange.cpython-311.pyc
Normal file
BIN
__pycache__/photo_manager.cpython-311.pyc
Normal file
BIN
__pycache__/voice_manager.cpython-311.pyc
Normal file
BIN
data/faces/piovra.png
Normal file
After Width: | Height: | Size: 139 KiB |
BIN
data/photos/24d76aac-6762-468a-afc0-0a93b8221f33.jpg
Normal file
After Width: | Height: | Size: 118 KiB |
BIN
data/photos/Grigliate_105.jpg
Normal file
After Width: | Height: | Size: 3.4 MiB |
BIN
data/photos/IMG_0046.jpg
Normal file
After Width: | Height: | Size: 3.4 MiB |
BIN
data/photos/IMG_0047.jpg
Normal file
After Width: | Height: | Size: 3.6 MiB |
BIN
data/photos/IMG_0051.jpg
Normal file
After Width: | Height: | Size: 2.8 MiB |
BIN
data/photos/Untitled.jpg
Normal file
After Width: | Height: | Size: 425 KiB |
BIN
data/photos/Untitled2.jpg
Normal file
After Width: | Height: | Size: 378 KiB |
BIN
data/vocal/no.ogg
Normal file
BIN
data/vocal/si.ogg
Normal file
BIN
data/vocal/yes.ogg
Normal file
107
main.py
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from telegram import ForceReply, Update, InputFile
|
||||||
|
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
|
||||||
|
|
||||||
|
from pydub import AudioSegment
|
||||||
|
import io
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
from voice_manager import VoiceManager
|
||||||
|
from photo_manager import PhotoManager
|
||||||
|
|
||||||
|
|
||||||
|
# Enable logging
|
||||||
|
logging.basicConfig(
|
||||||
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
|
||||||
|
)
|
||||||
|
# set higher logging level for httpx to avoid all GET and POST requests being logged
|
||||||
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# Define a few command handlers. These usually take the two arguments update and
|
||||||
|
# context.
|
||||||
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
"""Send a message when the command /start is issued."""
|
||||||
|
user = update.effective_user
|
||||||
|
await update.message.reply_html(
|
||||||
|
rf"Hi {user.mention_html()}!",
|
||||||
|
reply_markup=ForceReply(selective=True),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
"""Send a message when the command /help is issued."""
|
||||||
|
await update.message.reply_text("Help!")
|
||||||
|
|
||||||
|
|
||||||
|
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
print("Ricevuto messaggio testuale")
|
||||||
|
"""Echo the user message."""
|
||||||
|
await update.message.reply_text(update.message.text)
|
||||||
|
|
||||||
|
|
||||||
|
async def voice_oracle(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
audio_file = await update.message.voice.get_file()
|
||||||
|
audio_bytes = await audio_file.download_as_bytearray()
|
||||||
|
|
||||||
|
await context.bot.send_voice(update.message.chat_id, BytesIO(voice_oracle.vocal_manager.oracle(audio_bytes)))
|
||||||
|
|
||||||
|
voice_oracle.vocal_manager = VoiceManager()
|
||||||
|
|
||||||
|
async def get_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
print(get_photo.photo_mgr.get_random_photo())
|
||||||
|
await update.message.reply_photo(open(get_photo.photo_mgr.get_random_photo(), 'rb'))
|
||||||
|
|
||||||
|
get_photo.photo_mgr = PhotoManager()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def replace_faces(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
|
|
||||||
|
# user = update.message.from_user
|
||||||
|
|
||||||
|
photo_file = await update.message.photo[-1].get_file()
|
||||||
|
|
||||||
|
photo_bytes = await photo_file.download_as_bytearray()
|
||||||
|
|
||||||
|
out_img_bytes = replace_faces.photo_mgr.replace_faces(photo_bytes)
|
||||||
|
await update.message.reply_photo(InputFile(io.BytesIO(out_img_bytes)))
|
||||||
|
|
||||||
|
# await photo_file.download_to_drive("user_photo.jpg")
|
||||||
|
|
||||||
|
# logger.info("Photo of %s: %s", user.first_name, "user_photo.jpg")
|
||||||
|
|
||||||
|
replace_faces.photo_mgr = PhotoManager()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
|
||||||
|
"""Start the bot."""
|
||||||
|
# Create the Application and pass it your bot's token.
|
||||||
|
application = Application.builder().token("6583917096:AAG2SifgcHoQEqzprVp57ZVbxVgwc8Mygpo").build()
|
||||||
|
|
||||||
|
# on different commands - answer in Telegram
|
||||||
|
application.add_handler(CommandHandler("start", start))
|
||||||
|
application.add_handler(CommandHandler("help", help_command))
|
||||||
|
application.add_handler(CommandHandler("photo", get_photo))
|
||||||
|
|
||||||
|
|
||||||
|
# on non command i.e message - echo the message on Telegram
|
||||||
|
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
|
||||||
|
|
||||||
|
application.add_handler(MessageHandler(filters.VOICE, voice_oracle))
|
||||||
|
|
||||||
|
application.add_handler(MessageHandler(filters.PHOTO, replace_faces))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Run the bot until the user presses Ctrl-C
|
||||||
|
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
40
overlay_test.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
back = cv2.imread('/home/luca/git_repos/telegram_amicobot/data/photos/Untitled.jpg')
|
||||||
|
overlay = cv2.imread('/home/luca/git_repos/telegram_amicobot/data/faces/piovra.png', cv2.IMREAD_UNCHANGED) # Load with alpha channel
|
||||||
|
|
||||||
|
scale_percent = 200 # percent of original size
|
||||||
|
width = int(overlay.shape[1] * scale_percent / 100)
|
||||||
|
height = int(overlay.shape[0] * scale_percent / 100)
|
||||||
|
dim = (width, height)
|
||||||
|
|
||||||
|
# resize image
|
||||||
|
overlay = cv2.resize(overlay, dim, interpolation = cv2.INTER_AREA)
|
||||||
|
|
||||||
|
h, w = back.shape[:2]
|
||||||
|
h1, w1 = overlay.shape[:2]
|
||||||
|
|
||||||
|
# let store center coordinate as cx, cy
|
||||||
|
cx, cy = (w - w1) // 2, (h - h1) // 2 # Note the order of width and height here
|
||||||
|
|
||||||
|
# Ensure the overlay image has an alpha channel
|
||||||
|
if overlay.shape[2] == 3:
|
||||||
|
overlay = cv2.cvtColor(overlay, cv2.COLOR_BGR2BGRA)
|
||||||
|
|
||||||
|
# Create masks for overlay and background
|
||||||
|
overlay_mask = overlay[:, :, 3] / 255.0
|
||||||
|
background_mask = 1.0 - overlay_mask
|
||||||
|
|
||||||
|
# Resize overlay image to fit within specified region
|
||||||
|
overlay_resized = cv2.resize(overlay, (w1, h1))
|
||||||
|
|
||||||
|
# Blend the images using alpha blending
|
||||||
|
for c in range(0, 3):
|
||||||
|
back[cy:cy + h1, cx:cx + w1, c] = (overlay_resized[:, :, c] * overlay_mask +
|
||||||
|
back[cy:cy + h1, cx:cx + w1, c] * background_mask)
|
||||||
|
|
||||||
|
# View result
|
||||||
|
cv2.imshow('back with overlay', back)
|
||||||
|
cv2.waitKey(0)
|
||||||
|
cv2.destroyAllWindows()
|
139
photo_manager.py
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
import os
|
||||||
|
import random
|
||||||
|
from ultralytics import YOLO
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class PhotoManager:
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.parent_folder = "/home/luca/git_repos/telegram_amicobot/data/photos"
|
||||||
|
self.file_extension = "jpg"
|
||||||
|
|
||||||
|
def get_random_photo(self):
|
||||||
|
files = [f for f in os.listdir(self.parent_folder) if os.path.isfile(os.path.join(self.parent_folder, f))]
|
||||||
|
if not files:
|
||||||
|
raise FileNotFoundError("No files found in the specified folder")
|
||||||
|
|
||||||
|
return os.path.join(self.parent_folder, random.choice(files))
|
||||||
|
|
||||||
|
def replace_faces(self, photo_bytes):
|
||||||
|
|
||||||
|
# Convert the byte array to a NumPy array
|
||||||
|
nparr = np.frombuffer(photo_bytes, np.uint8)
|
||||||
|
|
||||||
|
# Decode the image
|
||||||
|
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||||
|
original_img = img
|
||||||
|
|
||||||
|
# Specify the desired padding size
|
||||||
|
padding_perc = 5
|
||||||
|
padding_size = int(img.shape[1] * (1+padding_perc/100))
|
||||||
|
|
||||||
|
# Calculate new dimensions
|
||||||
|
new_width = img.shape[1] + 2 * padding_size
|
||||||
|
new_height = img.shape[0] + 2 * padding_size
|
||||||
|
|
||||||
|
# Create a blank image with the new dimensions
|
||||||
|
padded_image = np.zeros((new_height, new_width, 3), dtype=np.uint8)
|
||||||
|
|
||||||
|
# Copy the original image into the center of the blank image
|
||||||
|
padded_image[padding_size:padding_size + img.shape[0], padding_size:padding_size + img.shape[1]] = img
|
||||||
|
img = padded_image
|
||||||
|
|
||||||
|
# Load a pre-trained YOLOv8n model
|
||||||
|
# Net downloaded from https://github.com/noorkhokhar99/face-detection-yolov8
|
||||||
|
model = YOLO('yolov8n-face.pt')
|
||||||
|
names = model.model.names
|
||||||
|
|
||||||
|
# Perform inference on 'bus.jpg' with specified parameters with conf=0.5
|
||||||
|
results = model.predict(img, verbose=False, conf=0.3, device='cpu')
|
||||||
|
|
||||||
|
# Process detections
|
||||||
|
boxes = results[0].boxes.xywh.cpu()
|
||||||
|
clss = results[0].boxes.cls.cpu().tolist()
|
||||||
|
confs = results[0].boxes.conf.float().cpu().tolist()
|
||||||
|
|
||||||
|
# for box, cls, conf in zip(boxes, clss, confs):
|
||||||
|
# print(f"Class Name: {names[int(cls)]}, Confidence Score: {conf}, Bounding Box: {box}")
|
||||||
|
|
||||||
|
for box, cls, conf in zip(boxes, clss, confs):
|
||||||
|
x, y, w, h = map(int, box)
|
||||||
|
class_name = names[int(cls)]
|
||||||
|
|
||||||
|
# Calculate the bottom-right corner coordinates
|
||||||
|
x1, y1 = x - w/2, y - h/2
|
||||||
|
x2, y2 = x + w/2, y + h/2
|
||||||
|
|
||||||
|
x1 = int(x1)
|
||||||
|
y1 = int(y1)
|
||||||
|
x2 = int(x2)
|
||||||
|
y2 = int(y2)
|
||||||
|
|
||||||
|
# # Draw bounding box
|
||||||
|
# cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
||||||
|
|
||||||
|
# # Display class name and confidence
|
||||||
|
# label = f"{class_name}: {conf:.2f}"
|
||||||
|
# cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
|
||||||
|
|
||||||
|
overlay = cv2.imread('/home/luca/git_repos/telegram_amicobot/data/faces/piovra.png', cv2.IMREAD_UNCHANGED) # Load with alpha channel
|
||||||
|
|
||||||
|
overlay_size_h = overlay.shape[0]
|
||||||
|
box_size_h = h
|
||||||
|
scale_h = (box_size_h/overlay_size_h) * 100
|
||||||
|
|
||||||
|
overlay_size_w = overlay.shape[1]
|
||||||
|
box_size_w = w
|
||||||
|
scale_w = (box_size_w/overlay_size_w) * 100
|
||||||
|
|
||||||
|
|
||||||
|
scale_percent = max(scale_h, scale_w) * 1.5 # percent of original size
|
||||||
|
width = int(overlay.shape[1] * scale_percent / 100)
|
||||||
|
height = int(overlay.shape[0] * scale_percent / 100)
|
||||||
|
dim = (width, height)
|
||||||
|
|
||||||
|
# resize image
|
||||||
|
overlay = cv2.resize(overlay, dim, interpolation = cv2.INTER_AREA)
|
||||||
|
|
||||||
|
h, w = img.shape[:2]
|
||||||
|
h1, w1 = overlay.shape[:2]
|
||||||
|
|
||||||
|
# let store center coordinate as cx, cy
|
||||||
|
cx, cy = x, y#(w - w1) // 2, (h - h1) // 2 # Note the order of width and height here
|
||||||
|
cx = cx - int(overlay.shape[1]/2)
|
||||||
|
cy = cy - int(overlay.shape[0]/2) - int(overlay.shape[0] * 0.15)
|
||||||
|
|
||||||
|
if(cx < 0):
|
||||||
|
cx = 0
|
||||||
|
if(cy < 0):
|
||||||
|
cy = 0
|
||||||
|
|
||||||
|
# Ensure the overlay image has an alpha channel
|
||||||
|
if overlay.shape[2] == 3:
|
||||||
|
overlay = cv2.cvtColor(overlay, cv2.COLOR_BGR2BGRA)
|
||||||
|
|
||||||
|
# Create masks for overlay and background
|
||||||
|
overlay_mask = overlay[:, :, 3] / 255.0
|
||||||
|
background_mask = 1.0 - overlay_mask
|
||||||
|
|
||||||
|
# Resize overlay image to fit within specified region
|
||||||
|
overlay_resized = cv2.resize(overlay, (w1, h1))
|
||||||
|
|
||||||
|
# Blend the images using alpha blending
|
||||||
|
for c in range(0, 3):
|
||||||
|
img[cy:cy + h1, cx:cx + w1, c] = (overlay_resized[:, :, c] * overlay_mask +
|
||||||
|
img[cy:cy + h1, cx:cx + w1, c] * background_mask)
|
||||||
|
|
||||||
|
|
||||||
|
img = img[padding_size:padding_size + original_img.shape[0], padding_size:padding_size + original_img.shape[1]]
|
||||||
|
# Convert OpenCV image to bytes
|
||||||
|
_, image_bytes = cv2.imencode('.jpg', img)
|
||||||
|
image_bytes = image_bytes.tobytes()
|
||||||
|
|
||||||
|
return image_bytes
|
||||||
|
|
||||||
|
# cv2.imshow('Image from Byte Array', img)
|
||||||
|
# cv2.waitKey(0)
|
||||||
|
# cv2.destroyAllWindows()
|
BIN
user_photo.jpg
Normal file
After Width: | Height: | Size: 199 KiB |
21
voice_manager.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
from pydub import AudioSegment
|
||||||
|
from io import BytesIO
|
||||||
|
import random
|
||||||
|
|
||||||
|
class VoiceManager:
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.yes = AudioSegment.from_ogg("/home/luca/git_repos/telegram_amicobot/data/vocal/si.ogg")
|
||||||
|
self.no = AudioSegment.from_ogg("/home/luca/git_repos/telegram_amicobot/data/vocal/no.ogg")
|
||||||
|
|
||||||
|
def oracle(self, vocal_message):
|
||||||
|
vm = AudioSegment.from_file(BytesIO(vocal_message), format="ogg")
|
||||||
|
|
||||||
|
random_number = random.choice([0, 1])
|
||||||
|
|
||||||
|
if random_number==0:
|
||||||
|
combined = vm + self.no
|
||||||
|
else:
|
||||||
|
combined = vm + self.yes
|
||||||
|
|
||||||
|
return combined.export(format="ogg", codec="libopus").read()
|
17
yolo_test.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
from ultralytics import YOLO
|
||||||
|
|
||||||
|
# Load a pre-trained YOLOv8n model
|
||||||
|
# Net downloaded from https://github.com/noorkhokhar99/face-detection-yolov8
|
||||||
|
model = YOLO('yolov8n-face.pt')
|
||||||
|
names = model.model.names
|
||||||
|
|
||||||
|
# Perform inference on 'bus.jpg' with specified parameters with conf=0.5
|
||||||
|
results = model.predict("/home/luca/git_repos/telegram_amicobot/data/photos/Grigliate_105.jpg", verbose=False, conf=0.7, device='cpu')
|
||||||
|
|
||||||
|
# Process detections
|
||||||
|
boxes = results[0].boxes.xywh.cpu()
|
||||||
|
clss = results[0].boxes.cls.cpu().tolist()
|
||||||
|
confs = results[0].boxes.conf.float().cpu().tolist()
|
||||||
|
|
||||||
|
for box, cls, conf in zip(boxes, clss, confs):
|
||||||
|
print(f"Class Name: {names[int(cls)]}, Confidence Score: {conf}, Bounding Box: {box}")
|