[PYTHON] Use SQLAlchemy and multiprocessing

A sample that uses a combination of Python's convenient multiprocess library, multiprocessing, and SQLAlchemy. Data is exchanged between processes in Queue, and DB is accessed using SQLALchemy session.

from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)


engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()

####################
#Table definition omitted(Item table)
####################

class Hoge(object):
    def __init__(self):
        '''A cue to be eaten by a multi-process and a cue to be spit out'''
        self.in_queue = Queue()
        self.out_queue = Queue()
        self.session = Session()

    def __del__(self):
        self.session.commit()
        self.session.close()

    def get_items(self, worker_num=4):
        '''Get data from DB in multi-process
Store the id you want to get in the queue, access the DB in multiple processes, and retrieve it.
        '''
        item_ids = self.session.query(Item.id).all()

        for item_id in item_ids:
            self.in_queue.put(item_id[0])

        jobs = []
        for i in xrange(worker_num):           
            p = Process(target=self.worker, args=[])
            p.daemon = True
            jobs.append(p)
            p.start()

        for job in jobs:
            job.join()
            #Process end message
            print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)

        return True

    def worker(self):
        '''Multi-process worker
        self.in_Get the id from the queue, get the data from the DB and self.out_Store in queue
        '''
        while not self.in_queue.empty():
            id = self.in_queue.get()
            print id,
            try:
                item = self.session.query(Item).filter(Item.id == id).first()
            except Exception as e:
                print 'error =>', e
                continue
            self.out_queue.put(item.name)
            print 'in_queue: {0} (out_queue: {1})'\
                    .format(self.in_queue.qsize(), self.out_queue.qsize())
        return True  #It won't end without this

important point

If you create a Session with scoped_session here

Could not locate column in row for column …

Please note that you may get an error such as

Also,

Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) IOError: [Errno 32] Broken pipe

I got an error saying, but I didn't understand it because it wasn't reproduced ...

Recommended Posts

Use SQLAlchemy and multiprocessing
Use Enums with SQLAlchemy
Use pyrtm and RTM CLI
Use DATE_FORMAT with SQLAlchemy filter
python metaclass and sqlalchemy declareative
Use and integration of "Shodan"
group_by with sqlalchemy and sum
Use Azure SQL Database with SQLAlchemy
Use instance method and class method properly
Difference between SQLAlchemy filter () and filter_by ()
Python parallel processing (multiprocessing and Joblib)
How to install and use Tesseract-OCR
[Python / matplotlib] Understand and use FuncAnimation
Read and use Python files from Python
Throw away curl and use httpie
[Note] WSL2 kernel build and use
Using Sessions and Reflections with SQLAlchemy
How to use .bash_profile and .bashrc
How to install and use Graphviz
Difference between SQLAlchemy flush () and commit ()
sqlalchemy
Use pytest-qt when pytesting PySdie and PyQt
Use MeCab and neologd with Google Colab
[Python] Use and and or when creating variables
How to install and use pandas_datareader [Python]
Use xticks () for pyplot and set_xticklabels () for Axes.
Use Jupyter Lab and Jupyter Notebook with EC2
Use PIL and Pillow with Cygwin Python
python: How to use locals () and globals ()
Use TPU and Keras with Google Colaboratory
How to use Python zip and enumerate
Easy! Use gensim and word2vec with MAMP.
Full understanding of Python threading and multiprocessing
How to use is and == in Python
How to use SQLAlchemy / Connect with aiomysql
Use Python and MeCab with Azure Functions
How to use pandas Timestamp and date_range