import bottle request = bottle.request response = bottle.response redirect = bottle.redirect import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import os # for environ vars import sys # to print ot stderr import re # to match our template system import pymongo # database from dotenv import load_dotenv import random, string # for tokens import html # for sanitization ##################################################### Bottle stuff ############################################$ # The exception that is thrown when an argument is missing class MissingParameterException (Exception): pass class StripPathMiddleware(object): ''' Get that slash out of the request ''' def __init__(self, a): self.a = a def __call__(self, e, h): e['PATH_INFO'] = e['PATH_INFO'].rstrip('/') return self.a(e, h) app = application = bottle.Bottle() ##################################################### Configuration ############################################$ # Load file from .env file. load_dotenv(os.path.dirname(__file__) + '.env') token_chars = string.ascii_lowercase+string.ascii_uppercase+string.digits token_len = 50 # Get address and port from env listen_address = os.environ['LISTEN_ADDRESS'] if 'LISTEN_ADDRESS' in os.environ else '0.0.0.0' listen_port = os.environ['LISTEN_PORT'] if 'LISTEN_PORT' in os.environ else 8080 # Get mail related informations from env mail_default_subject = os.environ['MAIL_DEFAULT_SUBJECT'] if 'MAIL_DEFAULT_SUBJECT' in os.environ else 'Nouveau message' mail_subject_prefix = os.environ['MAIL_SUBJECT_PREFIX'] if 'MAIL_SUBJECT_PREFIX' in os.environ else '[Contact]' # Redirect info success_redirect_default = os.environ['SUCCESS_REDIRECT_DEFAULT'] if 'SUCCESS_REDIRECT_DEFAULT' in os.environ else '/success' failure_redirect_default = os.environ['FAILURE_REDIRECT_DEFAULT'] if 'FAILURE_REDIRECT_DEFAULT' in os.environ else '/fail' # Get SMTP infos from env if 'SMTP_SERVER_ADDRESS' in os.environ: smtp_server_address = os.environ['SMTP_SERVER_ADDRESS'] else: raise MissingParameterException("Environment variable SMTP_SERVER_ADDRESS is missing") if 'SMTP_SERVER_PORT' in os.environ: smtp_server_port = os.environ['SMTP_SERVER_PORT'] else: raise MissingParameterException("Environment variable SMTP_SERVER_PORT is missing") if 'SMTP_SERVER_USERNAME' in os.environ: smtp_server_username = os.environ['SMTP_SERVER_USERNAME'] else: raise MissingParameterException("Environment variable SMTP_SERVER_USERNAME is missing") if 'SMTP_SERVER_PASSWORD' in os.environ: smtp_server_password = os.environ['SMTP_SERVER_PASSWORD'] else: raise MissingParameterException("return Environment variable SMTP_SERVER_PASSWORD is missing") if 'SMTP_SERVER_SENDER' in os.environ: smtp_server_sender = os.environ['SMTP_SERVER_SENDER'] else: raise MissingParameterException("Environment variable SMTP_SERVER_SENDER is missing") # Get mongodb connection if 'MONGODB_HOST' in os.environ: mongodb_host = os.environ['MONGODB_HOST'] else: raise MissingParameterException("Environment variable MONGODB_HOST is missing") mongodb_port = os.environ['MONGODB_PORT'] if 'MONGODB_PORT' in os.environ else '27017' mongodb_dbname = os.environ['MONGODB_DBNAME'] if 'MONGODB_DBNAME' in os.environ else 'contact_mailer' # Security if 'SMTP_SSL' in os.environ and os.environ['SMTP_SSL'] == 'true': security = 'ssl' elif 'SMTP_STARTTLS' in os.environ and os.onviron['SMTP_STARTTLS'] == 'true': security = 'starttls' else: raise MissingParameterException('No security env var (SMTP_SSL or SMTP_STARTTLS) have been defined. (Expected true or false)') if 'ADMIN_PASSWORD' in os.environ: admin_password = os.environ['ADMIN_PASSWORD'] else: raise MissingParameterException("Environment variable ADMIN_PASSWORD is missing") # mongodb initialization mongodb_client = pymongo.MongoClient("mongodb://{}:{}/".format(mongodb_host, mongodb_port)) mongodb_database = mongodb_client[mongodb_dbname] # form template regex form_regex = '\{\{(\w+)(\|\w+)?\}\}' @app.post('/fail') def fail (): a = 2/0 print('lol, failed', file=sys.stderr) return 'failed' @app.post('/submit') def submission (): # Getting subject if 'token' in request.forms: token = request.forms.getunicode('token') else: response.status = 400 return 'Le jeton d’autentification est requis' if 'mail' in request.forms: from_address = request.forms.getunicode('mail') else: #response.status = 400 #return 'Le mail est requis' from_address = '' try: form = mongodb_database['forms'].find({'token': token})[0] except IndexError as e: response.status = 400 return 'Le formulaire est introuvable' try: subject_fields = fill_fields(request, get_fields(form['subject'])) content_fields = fill_fields(request, get_fields(form['content'])) except MissingParameterException as e: response.status = 404 return str(e) subject = re.sub(form_regex, r'{\1}', form['subject']).format(**subject_fields) content = re.sub(form_regex, r'{\1}', form['content']).format(**content_fields) if not send_mail(from_address, form['mail'], subject, content): response.status = 500 return 'Le mail n’a pas pu être envoyé.' # Redirection #redirect(success_redirect_default) origin = request.headers.get('origin') return '

Mail envoyé !

' + ('

Retour au formulaire de contact

'.format(origin) if origin else '') ##################################################### Helpers ############################################$ def get_fields (string): """ Parse the string looking for template elements and create an array with template to fill and their default values. None if mandatory. """ result = {} for match in re.findall(form_regex, string): result[match[0]] = match[1] return result def fill_fields(request, fields): """Look for fields in request and fill fields dict with values or let default ones. If the value is required, throw exception.""" for field in fields: if field in request.forms: fields[field] = html.escape(request.forms[field]) elif fields[field] == None: raise MissingParameterException("Le champs {} est obligatoire".format(field)) return fields def send_mail(from_address, to, subject, content): """Actually connect to smtp server, build a message object and send it as a mail""" msg = MIMEMultipart() msg['From'] = smtp_server_sender msg.add_header('reply-to', from_address) msg['To'] = to msg['Subject'] = subject msg.attach(MIMEText(content, 'plain', "utf-8")) # SMTP preambles if security == 'ssl': smtp = smtplib.SMTP_SSL(smtp_server_address, smtp_server_port) elif security == 'starttls': smtp = smtplib.SMTP(smtp_server_address, smtp_server_port) smtp.ehlo() if security == 'starttls': smtp.starttls() smtp.ehlo() # SMTP connection smtp.login(smtp_server_username, smtp_server_password) refused = smtp.sendmail(smtp_server_sender, to, msg.as_string()) smtp.close() if refused: print('Message was not send to ' + str(refused)) return False return True def login(request): """ Check if user is admin or simple user. Return a disct with _privilege key. dict is also a user if _privilege == 1 Privileges : 0=admin 1=loggedIn 1000=guest """ if 'admin_pass' in request.forms and request.forms['admin_pass'] == admin_password: return {'_privilege':0} if 'token' in request.forms: token = request.forms.getunicode('token') try: user = mongodb_database['users'].find({'token': token})[0] user['_privilege'] = 1 return user except IndexError as e: pass return {'_privilege': 1000} # anonymous ##################################################### Forms ############################################$ @app.post('/form') def create_form (): # Getting subject template if 'subject' in request.forms: subject = request.forms.getunicode('subject') elif mail_default_subject != '': subject = mail_default_subject else: response.status = 400 return 'Le champs « sujet » est requis' # Getting mail content if 'content' in request.forms: content = request.forms.getunicode('content') else: response.status = 400 return 'Le champs « contenu » est requis' # Getting from address if 'mail' in request.forms: mail = request.forms.getunicode('mail') else: response.status = 400 return 'Le champs « adresse » est requis' user = login(request) if user['_privilege'] > 1: response.status = 400 return 'Privilèges insufisants' # TODO limit the insertion rate token = ''.join(random.sample(token_chars, token_len)) inserted = mongodb_database['forms'].insert_one({ 'mail': mail, 'content': content, 'subject': subject, 'user_id': user['_id'], 'token': token, }) return 'Créé : ' + token @app.post('/form/list') def list_forms (): user = login(request) if user['_privilege'] == 0: filt = {} elif user['_privilege'] == 1: filt = {'user_id': user['_id']} else: response.status = 400 return 'Privilèges insufisants' return bottle.template("list.tpl", data=mongodb_database['forms'].find(filt)) @app.delete('/form/') def delete_form(token): # If admin or form owner user = login(request) if user['_privilege'] > 1: response.status = 400 return 'Privilèges insufisants' # Actually delete try: form = mongodb_database['forms'].find({'token':token })[0] except IndexError as e: response.status = 400 return 'Le token n’est pas valide' if user['_privilege'] == 0 or (form['user_id'] == user['_id']): mongodb_database['forms'].delete_one({ 'token': token, }) return 'Supprimé ' + token response.status = 400 return 'Privilèges insufisants' ##################################################### Users ############################################$ @app.post('/user/list') def list_users (): user = login(request) if user['_privilege'] > 0: response.status = 400 return 'Privilèges insufisants' return bottle.template("list.tpl", data=mongodb_database['users'].find()) @app.put('/user/') def create_user (username): user = login(request) if user['_privilege'] > 0: response.status = 400 return 'Privilèges insufisants' try: mongodb_database['users'].find({'username': username})[0] return 'L’utilisateur existe déjà' except IndexError as e: inserted = mongodb_database['users'].insert_one({ 'username': username, 'token': ''.join(random.sample(token_chars, token_len)) }) return 'Créé : ' + username @app.delete('/user/') def delete_user (username): user = login(request) if user['_privilege'] > 0: response.status = 400 return 'Privilèges insufisants' try: mongodb_database['users'].find({'username': username})[0] except IndexError as e: response.status = 400 return 'L’utilisateur n’existe pas' mongodb_database['users'].delete_one({ 'username': username, }) return 'Supprimé ' + username if __name__ == '__main__': bottle.run(app=StripPathMiddleware(app), host=listen_address, port=listen_port, debug=True) else: prod_app = StripPathMiddleware(app)