MongoDB Python官方驱动 PyMongo 的简朴封装_玖富娱乐


玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。

近来,须要运用 Python 对 MongodB 做一些简朴的操纵,不想运用种种沉重的框架。出于可重用性的斟酌,想对 MongoDB Python 官方驱动 PyMongo 做下简朴封装,百度自始自终的未能给我一个惬意的效果,因而有了下文。

【正文】

PyMongo,MongoDB Python官方驱动

  • docs: https://api.mongodb.com/python/current/index.html
  • github: https://github.com/mongodb/mongo-python-driver

PyMongo 驱动险些支撑 MongoDB 的悉数特征,可以或许衔接单个的 MongoDB 数据库、副本集和分片集群。从供应的API角度来看,pymongo package是个中心,包罗对数据库的种种操纵。本文将引见一个简朴封装类 DBManager。重要特征:对数据库和鸠合的操纵确保其存在性;支撑PyMongo的原生操纵,包孕基础的CRUD操纵、批量操纵、MapReduce、多线程和多历程等;支撑因果一致性会话和事件的流水线操纵,并给出简朴示例。

MongoClient

mongo_client 供应了衔接 MongoDB 的MongoClient类:
class pymongo.mongo_client.MongoClient(host='localhost', port=27017, document_class=dict, tz_aware=False, connect=True, **kwargs)
每一个 MongoClient 实例 client (下文简称 client)都珍爱一个内建的衔接池,默许 maxPoolsize 巨细100。关于多线程的操纵,衔接池会给每一个线程一个 socket 衔接,直到到达最大的衔接数,后续的线程会壅塞以守候有可用的衔接被开释。client 对 MongoDB 拓扑组织中的每一个server 还珍爱一个分外的衔接来监听 server 的状况。

下面的 new_mongo_client函数用于猎取一个数据库衔接的 client。个中,client.admin.command('ismaster') 用来搜检 server 的可用状况,简朴费事不须要认证。

def new_mongo_client(uri, **kwargs):
    """Create new pymongo.mongo_client.MongoClient instance. DO NOT USE IT DIRECTLY."""

    try:
        client = MongoClient(uri, maxPoolSize=1024, **kwargs)
        client.admin.command('ismaster')  # The ismaster command is cheap and does not require auth.
    except ConnectionFailure:
        logging.error("new_mongo_client(): Server not available, Please check you uri: {}".format(uri))
        return None
    else:
        return client

PyMongo 不是历程(fork-safe)平安的,但在一个历程中是线程平安(thread-safe)的。因而罕见的场景是,关于一个MongoDB 状况,为每一个历程中建立一个 client ,背面一切的数据库操纵都运用这一个实例,包孕多线程操纵。永久不要为每一次操纵都建立一个 MongoClient 实例,运用完后挪用 MongoClient.close() 要领,如许没有必要并且会异常糟蹋机能。

鉴于以上缘由,一样平常不宜直接运用new_mongo_client函数猎取 client,而是进一步封装为get_mongo_client要领。 个中全局常量 URI_CLIENT_DICT 连结着数据库 URI 字符串与对应 clinet 的字典,一个 URI 对应一个 client 。代码以下:

MONGO_URI_DEFAULT = 'mongodb://localhost:27017/admin'
URI_CLIENT_DICT = {}    # a dictionary hold all client with uri as key
def get_mongo_client(uri=MONGO_URI_DEFAULT, fork=False, **kwargs):
    """Get pymongo.mongo_client.MongoClient instance. One mongodb uri, one client.

    @:param uri: mongodb uri
    @:param fork: for fork-safe in multiprocess case, if fork=True, return a new MongoClient instance, default False.
    @:param kwargs: refer to pymongo.mongo_client.MongoClient kwargs
    """
    if fork:
        return new_mongo_client(uri, **kwargs)
    global URI_CLIENT_DICT
    matched_client = URI_CLIENT_DICT.get(uri)
    if matched_client is None:  # no matched client
        new_client = new_mongo_client(uri, **kwargs)
        if new_client is not None:
            URI_CLIENT_DICT[uri] = new_client
        return new_client
    return matched_client

确保 Database 和 Collection 的存在

PyMongo 有个特征:关于不存在的数据库、鸠合上的查询不会报错。以下,Ipython中演示在不存在xxDB 数据库和 xxCollection 鸠合上的操纵:

In [1]: from pymongo import MongoClient
In [2]: client = MongoClient()          # default uri is 'mongodb://localhost:27017/admin'
In [3]: db = client.get_database('xxDB')    # Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'xxDB')
In [4]: coll = db.get_collection('XXCollection')  #  Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'xxDB'), u'XXCollection')
In [5]: coll.find_one()                 # note: no tip, no error, no exception, return None
In [6]: coll.insert_one({'hello' : 'what a fucking feature'})
Out[6]: <pymongo.results.InsertOneResult at 0x524ccc8>
In [7]: coll.find_one()
Out[7]: {u'_id': ObjectId('5c31c807bb048515b814d719'), u'hello': u'what a fucking feature'}

这关于手误写错数据库或鸠合名字后举行的后续操纵,几乎就是灾害。鉴于此因,有必要对猎取数据库或鸠应时加上确认珍爱。
下面关于猎取数据库,运用 MongoClient.list_database_names() 猎取一切的数据库名字,若是数据库称号不在个中,则返回None。一样的原理,关于鸠合运用 Database.list_collection_names()。注:因为用户权限题目形成的猎取数据库或鸠合列表的操纵报错的状况,默许不加确认珍爱。

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。-
def get_existing_db(client, db_name):
    """Get existing pymongo.database.Database instance.

    @:param client: pymongo.mongo_client.MongoClient instance
    @:param db_name: database name wanted
    """

    if client is None:
        logging.error('client {} is None'.format(client))
        return None
    try:
        db_available_list = client.list_database_names()
    except PyMongoError as e:
        logging.error('client: {}, db_name: {}, client.list_database_names() error: {}'.
                      format(client, db_name, repr(e)))
    else:
        if db_name not in db_available_list:
            logging.error('client {} has no db named {}'.format(client, db_name))
            return None
    db = client.get_database(db_name)
    return db

def get_existing_coll(db, coll_name):
    """Get existing pymongo.collection.Collection instance.

    @:param client: pymongo.mongo_client.MongoClient instance
    @:param coll_name: collection name wanted
    """

    if db is None:
        logging.error('db {} is None'.format(db))
        return None
    try:
        coll_available_list = db.list_collection_names()
    except PyMongoError as e:
        logging.error('db: {}, coll_name: {}, db.list_collection_names() error: {}'.
                      format(db, coll_name, repr(e)))
    else:
        if coll_name not in coll_available_list:
            logging.error('db {} has no collection named {}'.format(db, coll_name))
            return None
    coll = db.get_collection(coll_name)
    return coll

PyMongo 封装类 DBManger

前文的冗杂铺垫重要是为了引入这个 PyMongo 驱动封装类 DBManger。

DBManger 类的实例连结的状况有MongoClient实例 self.client, 数据库self.db 和 鸠合self.coll,并经由过程属性(property)对外开放。PyMongo 原生的要领对这里的 client, db 和 coll 一样实用。client 由类的组织器挪用上文的get_mongo_client要领猎取,db 和 coll 便可经由过程类的组织器猎取也可经由过程 self.db_nameself.coll_name 这些 setter 来切换。
DBManger 类的实例持有的要领 self.create_coll(self, db_name, coll_name), session_pipeline(self, pipeline)transaction_pipeline(self, pipeline)。后两种要领鄙人一节再详细诠释。

class DBManager:
    """A safe and simple pymongo packaging class ensuring existing database and collection.

    Operations:
    MongoClient level operations: https://api.mongodb.com/python/current/api/pymongo/mongo_client.html
    Database level operations: https://api.mongodb.com/python/current/api/pymongo/database.html
    Collection level operations: https://api.mongodb.com/python/current/api/pymongo/collection.html
    """
    __default_uri = 'mongodb://localhost:27017/admin'
    __default_db_name = 'test'
    __default_coll_name = 'test'

    def __init__(self, uri=__default_uri, db_name=__default_db_name, coll_name=__default_coll_name, **kwargs):
        self.__uri = uri
        self.__db_name = db_name
        self.__coll_name = coll_name
        self.__client = get_mongo_client(uri, **kwargs)
        self.__db = get_existing_db(self.__client, db_name)
        self.__coll = get_existing_coll(self.__db, coll_name)

    def __str__(self):
        return u'uri: {}, db_name: {}, coll_name: {}, id_client: {}, client: {}, db: {}, coll: {}'.format(
            self.uri, self.db_name, self.coll_name, id(self.client), self.client, self.db, self.coll)

    @property
    def uri(self):
        return self.__uri

    @property
    def db_name(self):
        return self.__db_name

    @property
    def coll_name(self):
        return self.__coll_name

    @db_name.setter
    def db_name(self, db_name):
        self.__db_name = db_name
        self.__db = get_existing_db(self.__client, db_name)

    @coll_name.setter
    def coll_name(self, coll_name):
        self.__coll_name = coll_name
        self.__coll = get_existing_coll(self.__db, coll_name)

    @property
    def client(self):
        return self.__client

    @property
    def db(self):
        return self.__db

    @property
    def coll(self):
        # always use the current instance self.__db
        self.__coll = get_existing_coll(self.__db, self.__coll_name)
        return self.__coll

    def create_coll(self, db_name, coll_name):
        """Create new collection with new or existing database"""
        if self.__client is None:
            return None
        try:
            return self.__client.get_database(db_name).create_collection(coll_name)
        except CollectionInvalid:
            logging.error('collection {} already exists in database {}'.format(coll_name, db_name))
            return None

    def session_pipeline(self, pipeline):
        if self.__client is None:
            logging.error('client is None in session_pipeline: {}'.format(self.__client))
            return None
        with self.__client.start_session(causal_consistency=True) as session:
            result = []
            for operation in pipeline:
                try:
                    if operation.level == 'client':
                        target = self.__client
                    elif operation.level == 'db':
                        target = self.__db
                    elif operation.level == 'coll':
                        target = self.__coll

                    operation_name = operation.operation_name
                    args = operation.args
                    kwargs = operation.kwargs
                    operator = getattr(target, operation_name)
                    if type(args) == tuple:
                        ops_rst = operator(*args, session=session, **kwargs)
                    else:
                        ops_rst = operator(args, session=session, **kwargs)

                    if operation.callback is not None:
                        operation.out = operation.callback(ops_rst)
                    else:
                        operation.out = ops_rst

                except Exception as e:
                    logging.error('{} {} Exception, session_pipeline args: {}, kwargs: {}'.format(
                        target, operation, args, kwargs))
                    logging.error('session_pipeline Exception: {}'.format(repr(e)))
                result.append(operation)
            return result

    # https://api.mongodb.com/python/current/api/pymongo/client_session.html#transactions
    def transaction_pipeline(self, pipeline):
        if self.__client is None:
            logging.error('client is None in transaction_pipeline: {}'.format(self.__client))
            return None
        with self.__client.start_session(causal_consistency=True) as session:
            with session.start_transaction():
                result = []
                for operation in pipeline:
                    try:
                        if operation.level == 'client':
                            target = self.__client
                        elif operation.level == 'db':
                            target = self.__db
                        elif operation.level == 'coll':
                            target = self.__coll
                        operation_name = operation.operation_name
                        args = operation.args
                        kwargs = operation.kwargs
                        operator = getattr(target, operation_name)
                        if type(args) == tuple:
                            ops_rst = operator(*args, session=session, **kwargs)
                        else:
                            ops_rst = operator(args, session=session, **kwargs)

                        if operation.callback is not None:
                            operation.out = operation.callback(ops_rst)
                        else:
                            operation.out = ops_rst
                    except Exception as e:
                        logging.error('{} {} Exception, transaction_pipeline args: {}, kwargs: {}'.format(
                            target, operation, args, kwargs))
                        logging.error('transaction_pipeline Exception: {}'.format(repr(e)))
                        raise Exception(repr(e))
                    result.append(operation)
                return result

这里给出一些例子来申明 DBManager的运用要领。

  • 建立鸠合、切换数据库或鸠合:
# get DBManger instance
var dbm = DBManager('mongodb://localhost:27017/admin') # db_name, coll_name default 'test'
dbm.create_coll('testDB', 'testCollection')
# change db or coll
dbm.db_name = 'testDB' # dbm.db (test -> testDB) and dbm.coll (test.testCollection-> testDB.testCollection) will be changed at the same time
dbm.coll_nmae = 'testCollection' # dbm.coll (test.test-> test.testCollection) will be change at the same time
  • 基础的操纵,CRUD:
# simple manipulation operation
dbm.coll.insert_one({'hello': 'world'})
print(dbm.coll.find_one())   # {'_id': ObjectId('...'), 'hello': 'world'}
dbm.coll.update_one({'hello': 'world'}, {'hello': 'hell'})

# bulk operation
from pymongo import InsertOne, DeleteOne, ReplaceOne, ReplaceOne
dbm.coll.bulk_write([InsertOne({'y':1}), DeleteOne({'x':1}), ReplaceOne({{'w':1}, {'z':1}, upsert=True})])

# simple managing operation
import pymongo
dbm.coll.create_index([('hello', pymongo.DESCENDING)], background=True)
dbm.client.list_database_names()
dbm.db.list_collection_names()
  • 线程并发,历程并行:
# thread concurrent
import threading
def fun(uri, db_name, coll_name):
    # new DBManager instance avoid variable competition
    dbm = DBManager(uri, db_name, coll_name)
    pass
t = threading.Thread(target=func, args=(uri, db_name, coll_name))
t.start()

# multiprocess parallel
import multiprocessing
def func(uri, db_name, coll_name):
    # new process, new client with fork=True parameter, and new DBManager instance.
    dbm = DBManager(uri, db_name, coll_name, fork=True)
    # Do something with db.
    pass
proc = multiprocessing.Process(target=func, args=(uri, db_name, coll_name))
proc.start()
  • MapReduce :
# MapReduce
from bson.code import Code
mapper = Code('''
function () {...}
''')
reducer = Code('''
function (key, value) {...}
''')
rst = dbm.coll.inline_map_reduce(mapper, reducer)

对 MongoDB 一致性会话(session)和 事件(transaction)的支撑

MongoDB Reference

  • docs: https://docs.mongodb.com/manual/
  • causal-consistency session: https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency
  • transation: https://docs.mongodb.com/manual/core/transactions/#transactions

会话(session),是对数据库衔接的一种逻辑透露表现。从MongoDB 3.6最先,MongoDB引入了客户端会话(client session),并在个中到场了对操纵的因果一致性(causal-consistency)的支撑。因而,更正确地说,这里 DBManger 类封装的实际上是因果一致性的会话,即client.start_session(causal_consistency=True)。不外,一致机可以或许包管的条件是客户端的运用应包管在一个会话中只要一个线程(thread)在做这些操纵。在一个客户端会话中,多个递次的读写操纵获得的效果与它们的实行递次将是因果一致的,读写的设置都自动设为 "majority"。运用场景:先写后读,先读后写,一致性的写,一致性的读(Read your writes,Writes follow reads,Monotonic writes, Monotonic reads)。客户端会话与效劳端会话(server session)举行交互。从3.6版本最先,MongoDB驱动将一切的操纵都联系关系到效劳端会话。效劳端会话是客户端会话递次操纵因果一致性和重试写操纵的得以支撑的底层框架。

MongoDB 对单个文档的操纵时是原子性的(atomic)。原子性是指一个操纵的效果要末有要末没有,弗成再切割,换句话说叫 “all or nothing”。从MongoDB 4.0最先,副本集(Replica set)最先支撑多个文档级别的原子性,即多文档事件(muti-document transaction)。在同一个事件中,对逾越分歧数据库或鸠合下的多个文档操纵,若是悉数操纵胜利,则该事件被胜利提交(commit);若是某些操纵涌现失利,则全部事件会停止(abort),操纵中对数据库的修改会被抛弃。只要在事件被胜利提交以后,操纵的效果能力被事件外看到,事件正在举行或许事件失利,个中的操纵对外都弗成见。单个mongod效劳和分片集群(sharded cluster)暂不支撑事件。MongoDB官方估计在4.2版本摆布对分片集群到场对事件的支撑。别的,须要注重的是,多文档事件会引入更大的机能开支,在场景许可的状况下,尽量斟酌用嵌套文档或数组的单文档操纵体式格局来解决题目。

会话和事件的重要运用场景实在都是多个的时序性操纵,即流水线情势。因而 DBManager 到场了session_pipeline(self, pipeline)transaction_pipeline(self, pipeline)的操纵要领。起首引入表征操纵的类Operation,形貌一个操纵作用的条理(client, db或coll)、操纵要领、参数和操纵效果须要挪用的回调函数,见名知意,不再赘解。多个操纵 Operation 类的实例组成的list 为pipeline, 作为session_pipeline(self, pipeline)transaction_pipeline(self, pipeline) 的输入参数。pipeline 操纵的每一步的输出会写入到对应Operation 类的实例的out属性中。

class Operation:
    """Operation for constructing sequential pipeline. Only used in DBManager.session_pipeline() or transaction_pipeline().

    Constructor parameters:
    level: <'client' | 'db' | 'coll'> indicating different operation level, MongoClient, Database, Collection
    operation_name: Literally, the name of operation on specific level
    args: position arguments the operation need. Require the first parameter or a tuple of parameters of the operation.
    kwargs: key word arguments the operation need.
    callback: callback function for operation result
    
    Examples:
    # pymongo.collection.Collection.find(filter, projection, skip=None, limit=None,...)
    Operation('coll', 'find', {'x': 5}) only filter parameter, equivalent to:
    Operation('coll', 'find', args={'x': 5}) or Operation('coll', 'find', kwargs={filter: {'x': 5}})

    Operation('coll', 'find', ({'x': 5},{'_id': 0}) {'limit':100}), equivalent to:
    Operation('coll', 'find', args=({'x': 5},{'_id': 0}, None, {'limit':100}) ), OR
    Operation('coll', 'find', kwargs={'filter':{'x': 5}, 'projection': {'_id': 0},'limit':100})

    def cursor_callback(cursor):
        return cursor.distinct('hello')
    Operation('coll', 'find', kwargs={'limit': 2}, callback=cursor_callback)
    """
    def __init__(self, level, operation_name, args=(), kwargs={}, callback=None):
        self.level = level
        self.operation_name = operation_name
        self.args = args
        if kwargs is None:
            self.kwargs = None
        else:
            self.kwargs = kwargs
        self.callback = callback
        self.out = None

基于 DBManager 和 Operation 的因果一致性的会话和事件的简朴示例以下:

# causal-consistency session or transaction pipeline operation
def cursor_callback(cursor):
    return cursor.distinct('hello')
op_1 = Operation('coll', 'insert_one', {'hello': 'heaven'})
op_2 = Operation('coll', 'insert_one', {'hello': 'hell'})
op_3 = Operation('coll', 'insert_one', {'hello': 'god'})
op_4 = Operation('coll', 'find', kwargs={'limit': 2}, callback=cursor_callback)
op_5 = Operation('coll', 'find_one', {'hello': 'god'})
pipeline = [op_1, op_2, op_3, op_4, op_5]
ops = dbm.transaction_pipeline(pipeline) # only on replica set deployment
# ops = dbm.session_pipeline(pipeline) # can be standalone, replica set or sharded cluster.
for op in ops:
    print(op.out)

【正文完】

注:内容同步自同名CSDN博客:https://blog.csdn.net/fzlulee/article/details/85944967

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。