332 lines
10 KiB
Python
332 lines
10 KiB
Python
import subprocess
|
|
import PyPDF2
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import qrcode
|
|
import json
|
|
import string
|
|
import random
|
|
import requests
|
|
|
|
def clear_workspace(files):
|
|
if not files or not isinstance(files, list) or len(files) == 0:
|
|
return
|
|
|
|
for file in files:
|
|
if os.path.exists(file):
|
|
os.remove(file)
|
|
|
|
|
|
def delete_folder(folder_path):
|
|
try:
|
|
shutil.rmtree(folder_path)
|
|
except Exception as e:
|
|
print(f"Error deleting the folder {folder_path}: {e}")
|
|
|
|
|
|
def merge_pdfs(pdf1_path, pdf2_path, output_path):
|
|
# open the two PDF files
|
|
with open(pdf1_path, 'rb') as file1, open(pdf2_path, 'rb') as file2:
|
|
# create PDF-Reader-Objects
|
|
pdf1_reader = PyPDF2.PdfReader(file1)
|
|
pdf2_reader = PyPDF2.PdfReader(file2)
|
|
|
|
# create PDF-Writer-Object
|
|
pdf_writer = PyPDF2.PdfWriter()
|
|
|
|
# add pages from the first PDF
|
|
for page_num in range(len(pdf1_reader.pages)):
|
|
page = pdf1_reader.pages[page_num]
|
|
pdf_writer.add_page(page)
|
|
|
|
# add pages from the second PDF
|
|
for page_num in range(len(pdf2_reader.pages)):
|
|
page = pdf2_reader.pages[page_num]
|
|
pdf_writer.add_page(page)
|
|
|
|
# save the combined PDF
|
|
with open(output_path, 'wb') as output_file:
|
|
pdf_writer.write(output_file)
|
|
|
|
|
|
# If a script failes the files are moved to the old_files directory so we need to move them back so that the paths defined in the files are correct
|
|
def move_files_back_from_old_files():
|
|
# check if old_files directory exists and move files back
|
|
if os.path.exists("oldFiles"):
|
|
for file in os.listdir("oldFiles"):
|
|
shutil.move(os.path.join("oldFiles", file), file)
|
|
os.rmdir("oldFiles")
|
|
|
|
|
|
# copy the tasks files from another task to the current runningTasks to execute it
|
|
# be aware of that in your work directory the python script has not the same name as the target script because it will be overwriten
|
|
def execute_another_group_task(folder_path, script_file_name):
|
|
work_directory = os.getcwd() # get current folder path
|
|
|
|
copy_files(folder_path, work_directory)
|
|
clear_workspace(["index.json", "requirements.txt"])
|
|
|
|
execute_python_file(script_file_name)
|
|
|
|
|
|
def execute_python_file(file_path):
|
|
if not os.path.exists(file_path):
|
|
print(f"The file {file_path} does not exist.")
|
|
sys.exit(1)
|
|
|
|
if not file_path.endswith('.py'):
|
|
print("The specified file is not a Python file.")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
# base_dir = os.path.dirname(os.path.abspath(file_path))
|
|
# os.chdir(base_dir) # Change to the directory of the script
|
|
subprocess.run(['python3.9', os.path.basename(
|
|
file_path)] + sys.argv[1:], check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error executing the file: {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"An unexpected error occurred: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def copy_files(source_folder, destination_folder):
|
|
if not os.path.exists(source_folder):
|
|
print(f"The source folder {source_folder} does not exist.")
|
|
sys.exit(1)
|
|
|
|
if not os.path.exists(destination_folder):
|
|
print(f"The destination folder {destination_folder} does not exist.")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
# List all files in the source folder
|
|
files = os.listdir(source_folder)
|
|
|
|
# Copy each file to the destination folder
|
|
for file in files:
|
|
source_path = os.path.join(source_folder, file)
|
|
destination_path = os.path.join(destination_folder, file)
|
|
shutil.copy2(source_path, destination_path)
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred while copying files: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
# this will extract the product id from the string which is provided as arg from the select dropdown where the user can select a product
|
|
def extract_product_id(product_type_id):
|
|
return product_type_id.split(" ")[0].split("#")[1]
|
|
|
|
|
|
def extract_filament_id(filament_type_id):
|
|
print("extract_filament_id", filament_type_id)
|
|
return filament_type_id.split(" ")[0]
|
|
|
|
|
|
def create_qrcode(qr_code_url, save_path, back_color):
|
|
data = qr_code_url
|
|
|
|
# Generate QR code
|
|
qr = qrcode.QRCode(
|
|
version=1,
|
|
error_correction=qrcode.constants.ERROR_CORRECT_L,
|
|
box_size=10,
|
|
border=0,
|
|
)
|
|
qr.add_data(data)
|
|
qr.make(fit=True)
|
|
|
|
# Create an image from the QR Code instance
|
|
img = qr.make_image(fill_color="black", back_color=back_color)
|
|
|
|
# Save image to a file
|
|
# like ./test.png
|
|
img.save(save_path)
|
|
|
|
|
|
def get_secrets():
|
|
with open('../../secrets/secrets.json') as content:
|
|
return json.load(content)
|
|
|
|
|
|
# create shopify discount code
|
|
# this function will retry 3 times to create a new discount code if the random generate code already exists
|
|
def create_shopify_discount_code(shopify_discount_code_price_rule_name):
|
|
# load data from secrets
|
|
secrets = get_secrets()
|
|
shopify_secrets = secrets["shopify"]
|
|
shopify_url = shopify_secrets["store_url"]
|
|
x_shopify_app_access_token = shopify_secrets["x_shopify_app_access_token"]
|
|
shopify_discount_code_price_rule = shopify_secrets["discount_price_rules"][shopify_discount_code_price_rule_name]
|
|
|
|
characters = string.ascii_uppercase + string.digits
|
|
|
|
# _createShopifyDiscountCode(secrets, characters, shopify_discount_code_price_rule, 0)
|
|
|
|
generated_discount_code = None
|
|
retries = 0
|
|
|
|
while (generated_discount_code is None or retries >= 3):
|
|
# generate random code
|
|
discount_code = "".join(random.choice(characters) for _ in range(12))
|
|
|
|
# shopify_discount_code_price_rule = shopify_secrets["discount_price_rules"]["order_voucher_10_percent"]
|
|
|
|
response = requests.post(
|
|
url=f"{shopify_url}/admin/api/2024-01/price_rules/{shopify_discount_code_price_rule}/discount_codes.json",
|
|
headers={
|
|
"X-Shopify-Access-Token": x_shopify_app_access_token,
|
|
"Content-Type": "application/json"
|
|
},
|
|
json={
|
|
"discount_code": {"code": discount_code}
|
|
})
|
|
|
|
if response.status_code == 422:
|
|
print("Failed to create code, response status code 422. Trying to generate new discount code")
|
|
|
|
if response.status_code != 201:
|
|
sys.exit(f"Failed to create discount code, status code: {response.status_code}")
|
|
|
|
generated_discount_code = discount_code
|
|
|
|
retries += 1
|
|
|
|
if retries >= 3:
|
|
sys.exit("Error, tried 3 times to create discount code")
|
|
|
|
print(f"Created discount code: {generated_discount_code}. Retries: {retries}")
|
|
|
|
return generated_discount_code
|
|
|
|
def get_email_footer_html(language, unsubscribe_link):
|
|
languageJson = None
|
|
|
|
with open("../../groupsData/jnx-email-template/lang.json", "r") as file:
|
|
languageJson = json.load(file)
|
|
|
|
with open("../../groupsData/jnx-email-template/footer.html", "r") as file:
|
|
content = file.read()
|
|
|
|
def replace_unsubscribe_text_button(content):
|
|
return content.replace("{{unsubscribe_text_button}}", f"<a href='{unsubscribe_link}'>{languageJson[language]['unsubscribe_text_button']}</a>")
|
|
|
|
# loop through all keys in the language json and replace the placeholders in the html file
|
|
for key in languageJson[language]:
|
|
if(key == "unsubscribe_text_button"):
|
|
content = replace_unsubscribe_text_button(content)
|
|
else:
|
|
value = languageJson[language][key]
|
|
value = value.replace("\n", "<br>")
|
|
content = content.replace(f"{{{{{key}}}}}", value)
|
|
|
|
|
|
content = replace_unsubscribe_text_button(content)
|
|
content = content.replace("{{unsubscribeLink}}", unsubscribe_link)
|
|
|
|
return content
|
|
|
|
def get_email_footer_text(language, unsubscribe_link):
|
|
languageJson = None
|
|
|
|
|
|
|
|
with open("../../groupsData/jnx-email-template/lang.json", "r") as file:
|
|
languageJson = json.load(file)
|
|
|
|
with open("../../groupsData/jnx-email-template/footer.txt", "r") as file:
|
|
content = file.read()
|
|
|
|
# loop through all keys in the language json and replace the placeholders in the html file
|
|
for key in languageJson[language]:
|
|
content = content.replace(f"{{{{{key}}}}}", languageJson[language][key])
|
|
|
|
content = content.replace("{{unsubscribe_text_button}}", languageJson[language]['unsubscribe_text_button'])
|
|
content = content.replace("{{unsubscribeLink}}", unsubscribe_link)
|
|
|
|
return content
|
|
|
|
|
|
def generate_unsubscribe_link(customerId):
|
|
print(f"Using CRM unsubscribe link for customer {customerId}")
|
|
|
|
_linkName = f"JNX: ⛔️⛔️⛔️ WARN - E-Mail unsubscribed ⛔️⛔️⛔️"
|
|
_secrets = get_secrets()
|
|
crm_endpoint_url = _secrets['admin_dashboard']['url'] + "/api/v1/crm"
|
|
|
|
headers = {
|
|
"X-Api-Key": _secrets["admin_dashboard"]["x_api_key"],
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
def get_link():
|
|
response = requests.get(
|
|
url=f"{crm_endpoint_url}/customer/view/{customerId}",
|
|
headers=headers,
|
|
)
|
|
|
|
print(f"GetCustomerActivityLinks response status code {response.status_code}")
|
|
|
|
if response.status_code != 200:
|
|
print(f"GetCustomerActivityLinks req error {response.status_code}")
|
|
sys.exit(1)
|
|
|
|
data = response.json()["Links"]
|
|
|
|
if len(data) == 0:
|
|
print("No links found")
|
|
return None
|
|
|
|
# sort data by creation date
|
|
|
|
data = sorted(data, key=lambda x: x["CreatedAt"], reverse=True)
|
|
|
|
# find the first link that has a name that starts with "Shopify Order #orderId - price € Gutschein"
|
|
|
|
createdLink = None
|
|
|
|
for link in data:
|
|
if link["Name"].startswith(_linkName):
|
|
createdLink = link
|
|
break
|
|
|
|
|
|
if(createdLink is None):
|
|
print(f"Unsubscribe link not found, creating new one")
|
|
return None
|
|
|
|
return _secrets["admin_dashboard"]["qr_code_url"] + createdLink['Id']
|
|
|
|
link = get_link()
|
|
|
|
if(link is not None):
|
|
print(f"Using existing unsubscribe link for customer {customerId}")
|
|
return link
|
|
|
|
print(f"Creating CRM unsubscribe link for customer {customerId}")
|
|
|
|
response = requests.post(
|
|
url=f"{crm_endpoint_url}/links",
|
|
headers=headers,
|
|
json={
|
|
"CustomerId": customerId,
|
|
"Name": _linkName,
|
|
"Url": "https://jannex.de/unsubscribed"
|
|
})
|
|
|
|
print(f"CreateCrmActivityLink response status code {response.status_code} for {type}")
|
|
|
|
if response.status_code != 200:
|
|
print(f"CreateCrmActivityLink req error {response.status_code}")
|
|
sys.exit(1)
|
|
|
|
link = get_link()
|
|
|
|
if(link is None):
|
|
print("Error creating unsubscribe link")
|
|
sys.exit(1)
|
|
|
|
return link |