diff options
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/examples/mongodb')
-rw-r--r-- | external_libs/python/pyzmq-14.7.0/examples/mongodb/client.py | 46 | ||||
-rw-r--r-- | external_libs/python/pyzmq-14.7.0/examples/mongodb/controller.py | 91 |
2 files changed, 137 insertions, 0 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/examples/mongodb/client.py b/external_libs/python/pyzmq-14.7.0/examples/mongodb/client.py new file mode 100644 index 00000000..839dce71 --- /dev/null +++ b/external_libs/python/pyzmq-14.7.0/examples/mongodb/client.py @@ -0,0 +1,46 @@ +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Justin Riley +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import json +import zmq + +class MongoZMQClient(object): + """ + Client that connects with MongoZMQ server to add/fetch docs + """ + + def __init__(self, connect_addr='tcp://127.0.0.1:5000'): + self._context = zmq.Context() + self._socket = self._context.socket(zmq.DEALER) + self._socket.connect(connect_addr) + + def _send_recv_msg(self, msg): + self._socket.send_multipart(msg) + return self._socket.recv_multipart()[0] + + def get_doc(self, keys): + msg = ['get', json.dumps(keys)] + json_str = self._send_recv_msg(msg) + return json.loads(json_str) + + def add_doc(self, doc): + msg = ['add', json.dumps(doc)] + return self._send_recv_msg(msg) + +def main(): + client = MongoZMQClient() + for i in range(10): + doc = {'job': str(i)} + print "Adding doc", doc + print client.add_doc(doc) + for i in range(10): + query = {'job': str(i)} + print "Getting doc matching query:", query + print client.get_doc(query) + +if __name__ == "__main__": + main() diff --git a/external_libs/python/pyzmq-14.7.0/examples/mongodb/controller.py b/external_libs/python/pyzmq-14.7.0/examples/mongodb/controller.py new file mode 100644 index 00000000..e154f1c5 --- /dev/null +++ b/external_libs/python/pyzmq-14.7.0/examples/mongodb/controller.py @@ -0,0 +1,91 @@ +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Justin Riley +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import sys +import zmq +import pymongo +import pymongo.json_util +import json + +class MongoZMQ(object): + """ + ZMQ server that adds/fetches documents (ie dictionaries) to a MongoDB. + + NOTE: mongod must be started before using this class + """ + + def __init__(self, db_name, table_name, bind_addr="tcp://127.0.0.1:5000"): + """ + bind_addr: address to bind zmq socket on + db_name: name of database to write to (created if doesnt exist) + table_name: name of mongodb 'table' in the db to write to (created if doesnt exist) + """ + self._bind_addr = bind_addr + self._db_name = db_name + self._table_name = table_name + self._conn = pymongo.Connection() + self._db = self._conn[self._db_name] + self._table = self._db[self._table_name] + + def _doc_to_json(self, doc): + return json.dumps(doc,default=pymongo.json_util.default) + + def add_document(self, doc): + """ + Inserts a document (dictionary) into mongo database table + """ + print 'adding docment %s' % (doc) + try: + self._table.insert(doc) + except Exception,e: + return 'Error: %s' % e + + def get_document_by_keys(self, keys): + """ + Attempts to return a single document from database table that matches + each key/value in keys dictionary. + """ + print 'attempting to retrieve document using keys: %s' % keys + try: + return self._table.find_one(keys) + except Exception,e: + return 'Error: %s' % e + + def start(self): + context = zmq.Context() + socket = context.socket(zmq.ROUTER) + socket.bind(self._bind_addr) + while True: + msg = socket.recv_multipart() + print "Received msg: ", msg + if len(msg) != 3: + error_msg = 'invalid message received: %s' % msg + print error_msg + reply = [msg[0], error_msg] + socket.send_multipart(reply) + continue + id = msg[0] + operation = msg[1] + contents = json.loads(msg[2]) + # always send back the id with ROUTER + reply = [id] + if operation == 'add': + self.add_document(contents) + reply.append("success") + elif operation == 'get': + doc = self.get_document_by_keys(contents) + json_doc = self._doc_to_json(doc) + reply.append(json_doc) + else: + print 'unknown request' + socket.send_multipart(reply) + +def main(): + MongoZMQ('ipcontroller','jobs').start() + +if __name__ == "__main__": + main() |