# Shopee Flash Sale Bot - Selenium Version
# Author: GaneMaX.Ai
# Requirements: pip install selenium webdriver-manager pandas
import os
import time
import json
import random
import threading
import pandas as pd
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import TimeoutException, NoSuchElementException
class ShopeeBot:
"""
Advanced Shopee Flash Sale Bot using Selenium
"""
def __init__(self, account, proxy=None, headless=False):
self.account = account
self.proxy = proxy
self.headless = headless
self.driver = None
self.wait = None
self.success = False
self.order_id = None
def setup_driver(self):
"""Setup Chrome driver with anti-detection measures"""
options = Options()
# Anti-detection options
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-webgl')
options.add_argument('--disable-software-rasterizer')
options.add_argument('--disable-features=VizDisplayCompositor')
# Random user agent
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15'
]
options.add_argument(f'user-agent={random.choice(user_agents)}')
# Proxy support
if self.proxy:
options.add_argument(f'--proxy-server={self.proxy}')
# Window size randomization
window_sizes = [(1920, 1080), (1366, 768), (1536, 864), (1440, 900)]
width, height = random.choice(window_sizes)
options.add_argument(f'--window-size={width},{height}')
# Headless mode
if self.headless:
options.add_argument('--headless=new')
# Initialize driver
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=options)
self.wait = WebDriverWait(self.driver, 10)
# Remove webdriver property
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
def random_delay(self, min_sec=1, max_sec=3):
"""Random delay to simulate human behavior"""
time.sleep(random.uniform(min_sec, max_sec))
def login(self):
"""Login to Shopee"""
try:
print(f"[{self.account['nickname']}] Logging in...")
# Click login button
login_btn = self.wait.until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(), 'Masuk')]"))
)
login_btn.click()
self.random_delay(2, 4)
# Enter username
username_field = self.wait.until(
EC.presence_of_element_located((By.NAME, "loginKey"))
)
username_field.clear()
for char in self.account['username']:
username_field.send_keys(char)
time.sleep(random.uniform(0.05, 0.15))
self.random_delay(1, 2)
# Enter password
password_field = self.driver.find_element(By.NAME, "password")
password_field.clear()
for char in self.account['password']:
password_field.send_keys(char)
time.sleep(random.uniform(0.05, 0.15))
self.random_delay(1, 2)
# Submit login
submit_btn = self.driver.find_element(By.XPATH, "//button[contains(text(), 'Masuk')]")
submit_btn.click()
# Wait for login to complete
time.sleep(5)
# Check if login successful
if "login" not in self.driver.current_url.lower():
print(f"[{self.account['nickname']}] Login successful")
return True
else:
print(f"[{self.account['nickname']}] Login failed")
return False
except Exception as e:
print(f"[{self.account['nickname']}] Login error: {str(e)}")
return False
def handle_captcha(self):
"""Handle captcha if present"""
try:
captcha_elements = self.driver.find_elements(By.XPATH, "//*[contains(text(), 'captcha') or contains(text(), 'CAPTCHA')]")
if captcha_elements:
print(f"[{self.account['nickname']}] Captcha detected, waiting...")
time.sleep(10)
return True
except:
pass
return False
def purchase(self, url, quantity):
"""Execute purchase"""
try:
print(f"[{self.account['nickname']}] Navigating to product...")
self.driver.get(url)
self.random_delay(3, 5)
# Handle any popups
try:
close_btn = self.driver.find_element(By.XPATH, "//button[contains(@class, 'close')]")
close_btn.click()
except:
pass
# Wait for buy button
buy_btn = self.wait.until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(), 'Beli Sekarang')]"))
)
# Adjust quantity
if quantity > 1:
try:
for _ in range(quantity - 1):
plus_btn = self.driver.find_element(By.XPATH, "//button[contains(@class, 'plus')]")
plus_btn.click()
time.sleep(random.uniform(0.3, 0.7))
except:
print(f"[{self.account['nickname']}] Quantity adjustment failed, using default")
# Click buy button
buy_btn.click()
self.random_delay(2, 4)
# Wait for checkout page
checkout_btn = self.wait.until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(), 'Checkout')]"))
)
# Select shipping if needed
try:
shipping_options = self.driver.find_elements(By.XPATH, "//input[@type='radio']")
if shipping_options:
shipping_options[0].click()
self.random_delay(1, 2)
except:
pass
# Click checkout
checkout_btn.click()
self.random_delay(3, 5)
# Check for order confirmation
if "checkout" in self.driver.current_url.lower() or "order" in self.driver.current_url.lower():
print(f"[{self.account['nickname']}] Purchase successful!")
# Try to get order ID
try:
order_elements = self.driver.find_elements(By.XPATH, "//*[contains(text(), 'Order') or contains(text(), 'Pesanan')]")
for el in order_elements:
text = el.text
if "ORDER" in text.upper() or "PESANAN" in text.upper():
self.order_id = text
break
except:
pass
self.success = True
return True
else:
print(f"[{self.account['nickname']}] Purchase may have failed")
return False
except TimeoutException:
print(f"[{self.account['nickname']}] Timeout waiting for elements")
return False
except Exception as e:
print(f"[{self.account['nickname']}] Purchase error: {str(e)}")
return False
def run(self, url, quantity):
"""Main execution flow"""
try:
self.setup_driver()
self.driver.get("https://shopee.co.id")
self.random_delay(3, 5)
if self.login():
self.handle_captcha()
result = self.purchase(url, quantity)
return result
else:
return False
except Exception as e:
print(f"[{self.account['nickname']}] Bot error: {str(e)}")
return False
finally:
if self.driver:
time.sleep(random.uniform(1, 3))
self.driver.quit()
def get_result(self):
"""Get execution result"""
return {
'success': self.success,
'order_id': self.order_id,
'account': self.account['nickname'],
'timestamp': datetime.now().isoformat()
}
class BotManager:
"""Manager for running multiple bots"""
def __init__(self):
self.threads = []
self.results = []
self.lock = threading.Lock()
def worker(self, account, url, quantity, proxy=None):
"""Worker thread function"""
bot = ShopeeBot(account, proxy)
result = bot.run(url, quantity)
with self.lock:
self.results.append({
'account': account['nickname'],
'success': result,
'timestamp': datetime.now().isoformat()
})
def run_batch(self, accounts, url, quantity, proxies=None, max_threads=5):
"""Run multiple bots simultaneously"""
self.results = []
self.threads = []
# Limit threads
accounts_to_run = accounts[:max_threads]
for i, account in enumerate(accounts_to_run):
proxy = proxies[i % len(proxies)] if proxies else None
thread = threading.Thread(
target=self.worker,
args=(account, url, quantity, proxy)
)
self.threads.append(thread)
thread.start()
# Stagger start
time.sleep(random.uniform(1, 3))
# Wait for all threads
for thread in self.threads:
thread.join()
return self.results
# Example usage
if __name__ == "__main__":
# Load accounts from file
accounts = [
{'nickname': 'Account1', 'username': 'user1@email.com', 'password': 'pass123'},
{'nickname': 'Account2', 'username': 'user2@email.com', 'password': 'pass456'},
]
# Load proxies
proxies = [
'192.168.1.1:8080',
'203.0.113.5:3128',
]
# Bot configuration
target_url = "https://shopee.co.id/product-link"
quantity = 2
# Run manager
manager = BotManager()
results = manager.run_batch(accounts, target_url, quantity, proxies, max_threads=3)
# Print results
success_count = sum(1 for r in results if r['success'])
print(f"\n=== Results ===")
print(f"Total attempts: {len(results)}")
print(f"Successful: {success_count}")
print(f"Failed: {len(results) - success_count}")
# Save results
with open('bot_results.json', 'w') as f:
json.dump(results, f, indent=2)
# Shopee Flash Sale Bot - Requests Version (Lightweight)
# Author: GaneMaX.Ai
# Requirements: pip install requests beautifulsoup4 pandas
import requests
import json
import time
import random
import threading
from datetime import datetime
from bs4 import BeautifulSoup
import pandas as pd
class ShopeeAPIBot:
"""
Lightweight Shopee Bot using Requests (No Browser)
"""
def __init__(self, account, proxy=None):
self.account = account
self.proxy = proxy
self.session = requests.Session()
self.cookies = {}
self.headers = self.generate_headers()
self.success = False
self.order_id = None
def generate_headers(self):
"""Generate random headers"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
]
return {
'User-Agent': random.choice(user_agents),
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'id-ID,id;q=0.9,en-US;q=0.8,en;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0'
}
def setup_session(self):
"""Setup session with proxy if needed"""
if self.proxy:
self.session.proxies = {
'http': f'http://{self.proxy}',
'https': f'http://{self.proxy}'
}
self.session.headers.update(self.headers)
def extract_shopee_token(self, html):
"""Extract CSRF token from page"""
soup = BeautifulSoup(html, 'html.parser')
# Try to find token in meta tags
meta = soup.find('meta', {'name': 'csrf-token'})
if meta and meta.get('content'):
return meta['content']
# Try to find in scripts
scripts = soup.find_all('script')
for script in scripts:
if script.string and 'CSRF_TOKEN' in script.string:
# Extract token using regex or simple parsing
import re
match = re.search(r'CSRF_TOKEN[":\s]+([^"\s]+)', script.string)
if match:
return match.group(1)
return None
def login(self):
"""Login via API"""
try:
print(f"[{self.account['nickname']}] Attempting login...")
# Get login page first for tokens
login_page = self.session.get('https://shopee.co.id/login')
time.sleep(random.uniform(2, 4))
# Extract CSRF token
csrf_token = self.extract_shopee_token(login_page.text)
# Login API endpoint
login_url = 'https://shopee.co.id/api/v1/login/'
login_data = {
'username': self.account['username'],
'password': self.account['password'],
'remember_me': True,
'csrf_token': csrf_token
}
# Add appropriate headers for API
headers = {
'X-CSRFToken': csrf_token,
'X-Requested-With': 'XMLHttpRequest',
'Content-Type': 'application/json'
}
self.session.headers.update(headers)
# Send login request
response = self.session.post(login_url, json=login_data)
if response.status_code == 200:
result = response.json()
if result.get('error') == 0:
print(f"[{self.account['nickname']}] Login successful")
# Save cookies
self.cookies = self.session.cookies.get_dict()
return True
else:
print(f"[{self.account['nickname']}] Login failed: {result.get('msg', 'Unknown error')}")
return False
else:
print(f"[{self.account['nickname']}] Login HTTP error: {response.status_code}")
return False
except Exception as e:
print(f"[{self.account['nickname']}] Login error: {str(e)}")
return False
def get_product_info(self, url):
"""Get product information"""
try:
# Extract item ID from URL
# Example: https://shopee.co.id/product/123456789/987654321
parts = url.split('/')
shop_id = None
item_id = None
for i, part in enumerate(parts):
if part == 'product' and i + 2 < len(parts):
shop_id = parts[i + 1]
item_id = parts[i + 2]
break
if not item_id:
print(f"[{self.account['nickname']}] Could not extract item ID from URL")
return None
# API endpoint for product details
api_url = f'https://shopee.co.id/api/v4/item/get?itemid={item_id}&shopid={shop_id}'
response = self.session.get(api_url)
if response.status_code == 200:
data = response.json()
if data.get('data'):
product = data['data']
return {
'name': product.get('name'),
'price': product.get('price') / 100000, # Convert from cents
'stock': product.get('stock'),
'item_id': item_id,
'shop_id': shop_id
}
return None
except Exception as e:
print(f"[{self.account['nickname']}] Get product info error: {str(e)}")
return None
def add_to_cart(self, item_id, shop_id, quantity):
"""Add item to cart"""
try:
cart_url = 'https://shopee.co.id/api/v4/cart/add_to_cart'
cart_data = {
'item_id': int(item_id),
'shop_id': int(shop_id),
'quantity': quantity,
'model_id': 0 # Default model
}
response = self.session.post(cart_url, json=cart_data)
if response.status_code == 200:
result = response.json()
if result.get('error') == 0:
print(f"[{self.account['nickname']}] Added to cart")
return True
else:
print(f"[{self.account['nickname']}] Add to cart failed: {result.get('msg')}")
return False
else:
return False
except Exception as e:
print(f"[{self.account['nickname']}] Add to cart error: {str(e)}")
return False
def checkout(self):
"""Process checkout"""
try:
# Get cart items
cart_url = 'https://shopee.co.id/api/v4/cart/get'
cart_response = self.session.get(cart_url)
if cart_response.status_code != 200:
return False
cart_data = cart_response.json()
if not cart_data.get('data', {}).get('cart_items'):
print(f"[{self.account['nickname']}] Cart is empty")
return False
# Process checkout
checkout_url = 'https://shopee.co.id/api/v4/checkout/checkout'
checkout_data = {
'checkout_items': [],
'shipping_orders': []
}
# Prepare checkout items from cart
for item in cart_data['data']['cart_items']:
checkout_data['checkout_items'].append({
'item_id': item['item_id'],
'shop_id': item['shop_id'],
'model_id': item.get('model_id', 0),
'quantity': item['quantity']
})
response = self.session.post(checkout_url, json=checkout_data)
if response.status_code == 200:
result = response.json()
if result.get('error') == 0:
print(f"[{self.account['nickname']}] Checkout successful!")
# Try to get order ID
if result.get('data', {}).get('order_list'):
self.order_id = result['data']['order_list'][0].get('order_sn')
self.success = True
return True
else:
print(f"[{self.account['nickname']}] Checkout failed: {result.get('msg')}")
return False
else:
return False
except Exception as e:
print(f"[{self.account['nickname']}] Checkout error: {str(e)}")
return False
def run(self, url, quantity):
"""Main execution flow"""
try:
self.setup_session()
# Login
if not self.login():
return False
time.sleep(random.uniform(2, 4))
# Get product info
product = self.get_product_info(url)
if not product:
return False
print(f"[{self.account['nickname']}] Product: {product['name']} - Stock: {product['stock']}")
# Add to cart
if not self.add_to_cart(product['item_id'], product['shop_id'], quantity):
return False
time.sleep(random.uniform(1, 3))
# Checkout
if self.checkout():
return True
else:
return False
except Exception as e:
print(f"[{self.account['nickname']}] Bot error: {str(e)}")
return False
# Example usage
if __name__ == "__main__":
accounts = [
{'nickname': 'Acc1', 'username': 'user1', 'password': 'pass1'},
{'nickname': 'Acc2', 'username': 'user2', 'password': 'pass2'}
]
proxies = ['proxy1:8080', 'proxy2:8080']
url = "https://shopee.co.id/product/123456789/987654321"
results = []
for i, account in enumerate(accounts):
proxy = proxies[i % len(proxies)] if proxies else None
bot = ShopeeAPIBot(account, proxy)
result = bot.run(url, quantity=2)
results.append({
'account': account['nickname'],
'success': result,
'order_id': bot.order_id
})
# Print summary
success_count = sum(1 for r in results if r['success'])
print(f"\nSuccess: {success_count}/{len(results)}")
# Shopee API Client - Full API Integration
# Author: GaneMaX.Ai
# Requirements: pip install requests websocket-client
import requests
import json
import time
import hmac
import hashlib
import base64
import random
from datetime import datetime
from typing import Dict, List, Optional
class ShopeeAPIClient:
"""
Complete Shopee API Client
Handles authentication, product queries, cart operations, and checkout
"""
def __init__(self, partner_id: int = None, partner_key: str = None, shop_id: int = None):
self.partner_id = partner_id or 100001 # Default test ID
self.partner_key = partner_key or "test_partner_key"
self.shop_id = shop_id
self.access_token = None
self.refresh_token = None
self.base_url = "https://partner.shopeemobile.com"
self.api_version = "v2"
self.session = requests.Session()
def generate_signature(self, path: str, timestamp: int) -> str:
"""Generate API signature"""
base_string = f"{self.partner_id}{path}{timestamp}"
signature = hmac.new(
self.partner_key.encode('utf-8'),
base_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def make_request(self, method: str, path: str, params: Dict = None, data: Dict = None) -> Dict:
"""Make authenticated API request"""
timestamp = int(time.time())
if params is None:
params = {}
# Add auth params
params.update({
'partner_id': self.partner_id,
'timestamp': timestamp,
'sign': self.generate_signature(path, timestamp)
})
if self.access_token:
params['access_token'] = self.access_token
if self.shop_id:
params['shop_id'] = self.shop_id
url = f"{self.base_url}{path}"
response = self.session.request(method, url, params=params, json=data)
return response.json()
def get_access_token(self, auth_code: str) -> bool:
"""Get access token using authorization code"""
path = f"/api/{self.api_version}/auth/token/get"
data = {
'code': auth_code,
'partner_id': self.partner_id,
'shop_id': self.shop_id
}
result = self.make_request('POST', path, data=data)
if result.get('error') is None:
self.access_token = result.get('access_token')
self.refresh_token = result.get('refresh_token')
return True
return False
def refresh_access_token(self) -> bool:
"""Refresh access token"""
if not self.refresh_token:
return False
path = f"/api/{self.api_version}/auth/access_token/get"
data = {
'refresh_token': self.refresh_token,
'partner_id': self.partner_id,
'shop_id': self.shop_id
}
result = self.make_request('POST', path, data=data)
if result.get('error') is None:
self.access_token = result.get('access_token')
return True
return False
def get_product_detail(self, item_id: int) -> Dict:
"""Get product details"""
path = f"/api/{self.api_version}/product/get_item_detail"
params = {
'item_id': item_id
}
return self.make_request('GET', path, params=params)
def get_flash_sale_items(self, limit: int = 100) -> List[Dict]:
"""Get flash sale items"""
path = f"/api/{self.api_version}/flash_sale/get_flash_sale_items"
params = {
'limit': limit,
'offset': 0
}
result = self.make_request('GET', path, params=params)
return result.get('response', {}).get('items', [])
def add_to_cart(self, item_id: int, quantity: int) -> bool:
"""Add item to cart"""
path = f"/api/{self.api_version}/cart/add"
data = {
'item_id': item_id,
'quantity': quantity
}
result = self.make_request('POST', path, data=data)
return result.get('error') is None
def get_cart(self) -> Dict:
"""Get cart contents"""
path = f"/api/{self.api_version}/cart/get"
result = self.make_request('GET', path)
return result.get('response', {})
def checkout(self, checkout_items: List[Dict]) -> Dict:
"""Process checkout"""
path = f"/api/{self.api_version}/checkout/checkout"
data = {
'checkout_items': checkout_items,
'payment_method': 'cod', # Cash on delivery
'shipping_options': []
}
result = self.make_request('POST', path, data=data)
return result
class FlashSaleMonitor:
"""Monitor and automatically purchase flash sale items"""
def __init__(self, api_client: ShopeeAPIClient):
self.api = api_client
self.monitoring = False
self.target_items = []
def add_target(self, item_id: int, max_price: int):
"""Add item to monitoring list"""
self.target_items.append({
'item_id': item_id,
'max_price': max_price,
'purchased': False
})
def check_price(self, item_id: int) -> Optional[int]:
"""Check current price of item"""
result = self.api.get_product_detail(item_id)
if result.get('error') is None:
item = result.get('response', {})
return item.get('price')
return None
def monitor_loop(self, check_interval: float = 1.0):
"""Main monitoring loop"""
self.monitoring = True
while self.monitoring:
for target in self.target_items:
if target['purchased']:
continue
price = self.check_price(target['item_id'])
if price and price <= target['max_price']:
print(f"Item {target['item_id']} at target price: {price}")
# Add to cart
if self.api.add_to_cart(target['item_id'], 1):
print("Added to cart, proceeding to checkout...")
# Get cart and checkout
cart = self.api.get_cart()
if cart.get('cart_items'):
checkout_result = self.api.checkout(cart['cart_items'])
if checkout_result.get('error') is None:
print("Checkout successful!")
target['purchased'] = True
time.sleep(check_interval)
def stop(self):
"""Stop monitoring"""
self.monitoring = False
# Example usage
if __name__ == "__main__":
# Initialize client
client = ShopeeAPIClient(
partner_id=123456,
partner_key="your_partner_key",
shop_id=789012
)
# Monitor flash sale
monitor = FlashSaleMonitor(client)
monitor.add_target(123456789, 10000) # Item ID, max price 10k
monitor.monitor_loop()
# Multi-Thread Manager for Shopee Bots
# Author: GaneMaX.Ai
# Requirements: pip install concurrent.futures pandas
import concurrent.futures
import threading
import queue
import time
import random
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime
import pandas as pd
@dataclass
class BotTask:
"""Represents a bot task"""
account_id: int
account_username: str
account_password: str
target_url: str
quantity: int
proxy: Optional[str] = None
priority: int = 1 # 1 = highest, 5 = lowest
@dataclass
class BotResult:
"""Represents bot execution result"""
task: BotTask
success: bool
order_id: Optional[str] = None
error_message: Optional[str] = None
execution_time: float = 0
timestamp: str = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now().isoformat()
class BotWorker:
"""Individual bot worker"""
def __init__(self, worker_id: int, task_queue: queue.Queue, result_queue: queue.Queue):
self.worker_id = worker_id
self.task_queue = task_queue
self.result_queue = result_queue
self.running = False
self.current_task = None
def execute_task(self, task: BotTask) -> BotResult:
"""Execute a single bot task"""
start_time = time.time()
try:
print(f"[Worker {self.worker_id}] Starting task for {task.account_username}")
# Simulate bot execution
time.sleep(random.uniform(2, 5))
# Simulate success/failure (80% success rate)
success = random.random() < 0.8
if success:
order_id = f"ORDER{random.randint(100000, 999999)}"
print(f"[Worker {self.worker_id}] Success! Order: {order_id}")
return BotResult(
task=task,
success=True,
order_id=order_id,
execution_time=time.time() - start_time
)
else:
print(f"[Worker {self.worker_id}] Failed")
return BotResult(
task=task,
success=False,
error_message="Purchase failed",
execution_time=time.time() - start_time
)
except Exception as e:
print(f"[Worker {self.worker_id}] Error: {str(e)}")
return BotResult(
task=task,
success=False,
error_message=str(e),
execution_time=time.time() - start_time
)
def run(self):
"""Main worker loop"""
self.running = True
while self.running:
try:
# Get task from queue (timeout after 1 second)
task = self.task_queue.get(timeout=1)
self.current_task = task
# Execute task
result = self.execute_task(task)
# Send result
self.result_queue.put(result)
# Mark task as done
self.task_queue.task_done()
self.current_task = None
except queue.Empty:
# No tasks available
continue
except Exception as e:
print(f"[Worker {self.worker_id}] Worker error: {str(e)}")
self.running = False
def stop(self):
"""Stop the worker"""
self.running = False
class ThreadManager:
"""Manages multiple bot workers"""
def __init__(self, max_workers: int = 5):
self.max_workers = max_workers
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.workers = []
self.results = []
self.stats = {
'total_tasks': 0,
'completed': 0,
'success': 0,
'failed': 0,
'start_time': None,
'end_time': None
}
def start_workers(self):
"""Start all worker threads"""
self.stats['start_time'] = datetime.now()
for i in range(self.max_workers):
worker = BotWorker(i + 1, self.task_queue, self.result_queue)
thread = threading.Thread(target=worker.run, daemon=True)
thread.start()
self.workers.append({
'worker': worker,
'thread': thread
})
print(f"Started {self.max_workers} workers")
def stop_workers(self):
"""Stop all workers"""
for w in self.workers:
w['worker'].stop()
for w in self.workers:
w['thread'].join(timeout=5)
self.stats['end_time'] = datetime.now()
print("All workers stopped")
def add_task(self, task: BotTask):
"""Add a task to the queue"""
self.task_queue.put(task)
self.stats['total_tasks'] += 1
def add_tasks(self, tasks: List[BotTask]):
"""Add multiple tasks"""
for task in tasks:
self.add_task(task)
def collect_results(self, timeout: Optional[float] = None) -> List[BotResult]:
"""Collect all results"""
results = []
completed = 0
while completed < self.stats['total_tasks']:
try:
result = self.result_queue.get(timeout=timeout or 1)
results.append(result)
completed += 1
# Update stats
if result.success:
self.stats['success'] += 1
else:
self.stats['failed'] += 1
print(f"Progress: {completed}/{self.stats['total_tasks']} tasks completed")
except queue.Empty:
if timeout is not None:
break
continue
self.results = results
self.stats['completed'] = completed
return results
def get_stats(self) -> Dict:
"""Get execution statistics"""
if self.stats['start_time'] and self.stats['end_time']:
duration = (self.stats['end_time'] - self.stats['start_time']).total_seconds()
else:
duration = 0
success_rate = (self.stats['success'] / self.stats['completed'] * 100) if self.stats['completed'] > 0 else 0
return {
**self.stats,
'duration_seconds': duration,
'success_rate': f"{success_rate:.1f}%",
'tasks_per_second': self.stats['completed'] / duration if duration > 0 else 0
}
def export_results(self, filename: str = 'bot_results.json'):
"""Export results to file"""
data = []
for r in self.results:
data.append({
'account': r.task.account_username,
'success': r.success,
'order_id': r.order_id,
'error': r.error_message,
'execution_time': r.execution_time,
'timestamp': r.timestamp
})
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
print(f"Results exported to {filename}")
def generate_report(self) -> pd.DataFrame:
"""Generate pandas DataFrame report"""
data = []
for r in self.results:
data.append({
'Account': r.task.account_username,
'Success': '✅' if r.success else '❌',
'Order ID': r.order_id or '-',
'Error': r.error_message or '-',
'Time (s)': f"{r.execution_time:.2f}",
'Timestamp': r.timestamp
})
df = pd.DataFrame(data)
return df
# Example usage
if __name__ == "__main__":
# Create manager with 3 workers
manager = ThreadManager(max_workers=3)
# Create tasks
tasks = []
for i in range(10):
task = BotTask(
account_id=i + 1,
account_username=f"user{i+1}@email.com",
account_password=f"pass{i+1}",
target_url="https://shopee.co.id/product/123",
quantity=1,
proxy=f"proxy{i}.com:8080" if i % 2 == 0 else None,
priority=random.randint(1, 5)
)
tasks.append(task)
# Start workers
manager.start_workers()
# Add tasks
manager.add_tasks(tasks)
print(f"Added {len(tasks)} tasks to queue")
# Collect results
results = manager.collect_results()
# Stop workers
manager.stop_workers()
# Print stats
stats = manager.get_stats()
print("\n=== Statistics ===")
for key, value in stats.items():
print(f"{key}: {value}")
# Export results
manager.export_results()
# Generate report
df = manager.generate_report()
print("\n=== Report ===")
print(df.to_string(index=False))
# Save report to CSV
df.to_csv('bot_report.csv', index=False)
print("\nReport saved to bot_report.csv")