FLASK: How to establish connection with mysql server?

I already have a mysql connection from Flask like this:

app.config['MYSQL_HOST'] = 'one.hostname.net'
app.config['MYSQL_USER'] = 'my_username'
app.config['MYSQL_PASSWORD'] = 'my_password'
app.config['MYSQL_DB'] = 'user_mydb'

mysql = MySQL(app)

with this setup I am able to use mysql database connection in flask. but when it comes to celery task, which is inside the same python file as flask.

@mycelery.task(bind=True, name='mytask')
def mytask(self, userid, port):
    cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
    cursor.execute('SELECT * FROM mytable WHERE id = %s', (userid,))

It throws me an error saying

cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
AttributeError: 'NoneType' object has no attribute 'cursor'

I understand it is because the celery has no MySQL connection made. But How can I establish the connection? so that I don’t have to connect with the MySQL server whenever I create a task just like how it is don’t for flask where we already established the connection by MySQL = MySQL(app)??

Here is my celery setup, if it is helpful to add something to this code

mycelery = Celery(app.name)
mycelery.conf.update({
    'broker_url': 'filesystem://',
    'broker_transport_options': {
        'data_folder_in': 'app/broker/out',
        'data_folder_out': 'app/broker/out',
        'data_folder_processed': 'app/broker/processed'
    },

    'result_persistent': False,
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})

Answers:

Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.

Method 1

Firstly if you want to do dabatabase operation you can do in celery task, but you dont have to connect db with celery.
You can connect flask with db, and install celery in your project and make db operation in your celery task.

Sample:

app.py

from flask import Flask
from flaskext.mysql import MySQL

app = Flask(__name__)
mysql = MySQL()
app.config['MYSQL_DATABASE_USER'] = ''
app.config['MYSQL_DATABASE_PASSWORD'] = ''
app.config['MYSQL_DATABASE_DB'] = ''
app.config['MYSQL_DATABASE_HOST'] = ''
mysql.init_app(app)

tasks.py

@celery.task
def db_connect_things(): 
    conn = mysql.connect()
    cursor =conn.cursor()
    sql_query = """select from where """
    cursor.execute(sql_query)
    ...

celery_config.py

from celery import Celery
celery = Celery(__name__)
celery = Celery('tasks', broker=) # rabbit,redis, ..
celery.conf.update({'CELERY_ACCEPT_CONTENT': ['pickle', 'json', 'msgpack', 'yaml']})
celery.conf.add_defaults(...)

celery.conf.update(CELERYBEAT_SCHEDULE={
    'db_connect_things': {
        'task': 'application.lib.tasks.db_connect_things',
        'schedule': crontab(minute=0, hour='*/12'),
    }})

class ContextTask(celery.Task):
...


All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x