diff --git a/README.md b/README.md index c13d64c..42721bf 100644 --- a/README.md +++ b/README.md @@ -1 +1,10 @@ -# mongo_plugin \ No newline at end of file +# mongo_plugin + +MongoDB operators and hooks for Airflow + +This plugin contains an hook for connecting with Airflow and also some operators for syncing with Amazon S3 and Google Cloud Storage: + +* MongoHook +* S3ToMongoOperator +* MongoToS3Operator +* MongoToGCSOperator diff --git a/__init__.py b/__init__.py index 6b5026f..d57ecf7 100644 --- a/__init__.py +++ b/__init__.py @@ -2,11 +2,12 @@ from mongo_plugin.hooks.mongo_hook import MongoHook from mongo_plugin.operators.s3_to_mongo_operator import S3ToMongoOperator from mongo_plugin.operators.mongo_to_s3_operator import MongoToS3Operator +from mongo_plugin.operators.mongo_to_gcs_operator import MongoToGCSOperator class MongoPlugin(AirflowPlugin): name = "MongoPlugin" - operators = [MongoToS3Operator, S3ToMongoOperator] + operators = [MongoToS3Operator, S3ToMongoOperator, MongoToGCSOperator] hooks = [MongoHook] executors = [] macros = [] diff --git a/__pycache__/__init__.cpython-36.pyc b/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000..4d58572 Binary files /dev/null and b/__pycache__/__init__.cpython-36.pyc differ diff --git a/hooks/__pycache__/__init__.cpython-36.pyc b/hooks/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000..97003ee Binary files /dev/null and b/hooks/__pycache__/__init__.cpython-36.pyc differ diff --git a/hooks/__pycache__/mongo_hook.cpython-36.pyc b/hooks/__pycache__/mongo_hook.cpython-36.pyc new file mode 100644 index 0000000..ee436ed Binary files /dev/null and b/hooks/__pycache__/mongo_hook.cpython-36.pyc differ diff --git a/operators/__pycache__/__init__.cpython-36.pyc b/operators/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000..af70d7a Binary files /dev/null and b/operators/__pycache__/__init__.cpython-36.pyc differ diff --git a/operators/__pycache__/mongo_to_gcs_operator.cpython-36.pyc b/operators/__pycache__/mongo_to_gcs_operator.cpython-36.pyc new file mode 100644 index 0000000..7d7f2dd Binary files /dev/null and b/operators/__pycache__/mongo_to_gcs_operator.cpython-36.pyc differ diff --git a/operators/__pycache__/mongo_to_s3_operator.cpython-36.pyc b/operators/__pycache__/mongo_to_s3_operator.cpython-36.pyc new file mode 100644 index 0000000..393ebe7 Binary files /dev/null and b/operators/__pycache__/mongo_to_s3_operator.cpython-36.pyc differ diff --git a/operators/__pycache__/s3_to_mongo_operator.cpython-36.pyc b/operators/__pycache__/s3_to_mongo_operator.cpython-36.pyc new file mode 100644 index 0000000..7b20415 Binary files /dev/null and b/operators/__pycache__/s3_to_mongo_operator.cpython-36.pyc differ diff --git a/operators/mongo_to_gcs_operator.py b/operators/mongo_to_gcs_operator.py new file mode 100644 index 0000000..32938f4 --- /dev/null +++ b/operators/mongo_to_gcs_operator.py @@ -0,0 +1,95 @@ +from bson import json_util +import json +import os + +from mongo_plugin.hooks.mongo_hook import MongoHook +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.models import BaseOperator + + +class MongoToGCSOperator(BaseOperator): + """ + Mongo -> GCS + :param mongo_conn_id: The source mongo connection id. + :type mongo_conn_id: string + :param mongo_collection: The source mongo collection. + :type mongo_collection: string + :param mongo_database: The source mongo database. + :type mongo_database: string + :param mongo_query: The specified mongo query. + :type mongo_query: string + :param gcs_bucket: The destination gcs bucket. + :type gcs_bucket: string + :param gcs_object: The destination gcs filepath. + :type gcs_object: string + :param gcs_conn_id: The destination gcs connnection id. + :type gcs_conn_id: string + """ + + # TODO This currently sets job = queued and locks job + template_fields = ['gcs_object', 'mongo_query'] + + def __init__(self, + mongo_conn_id, + mongo_collection, + mongo_database, + mongo_query, + gcs_bucket, + gcs_object, + gcs_conn_id, + *args, **kwargs): + super(MongoToGCSOperator, self).__init__(*args, **kwargs) + # Conn Ids + self.mongo_conn_id = mongo_conn_id + self.gcs_conn_id = gcs_conn_id + # Mongo Query Settings + self.mongo_db = mongo_database + self.mongo_collection = mongo_collection + # Grab query and determine if we need to run an aggregate pipeline + self.mongo_query = mongo_query + self.is_pipeline = True if isinstance(self.mongo_query, list) else False + + # GCS Settings + self.gcs_bucket = gcs_bucket + self.gcs_object = gcs_object + + # KWARGS + self.replace = kwargs.pop('replace', False) + + def execute(self, context): + """ + Executed by task_instance at runtime + """ + mongo_conn = MongoHook(self.mongo_conn_id).get_conn() + gcs_conn = GoogleCloudStorageHook(self.gcs_conn_id) + + # Grab collection and execute query according to whether or not it is a pipeline + collection = mongo_conn.get_database(self.mongo_db).get_collection(self.mongo_collection) + results = collection.aggregate(self.mongo_query) if self.is_pipeline else collection.find(self.mongo_query) + + # Performs transform then stringifies the docs results into json format + docs_str = self._stringify(self.transform(results)) + with open("__temp__", "w") as fid: + fid.write(docs_str) + + gcs_conn.upload(self.gcs_bucket, self.gcs_object, "__temp__") + + #os.remove("__temp__") + #s3_conn.load_string(docs_str, self.s3_key, bucket_name=self.s3_bucket, replace=self.replace) + + def _stringify(self, iter, joinable='\n'): + """ + Takes an interable (pymongo Cursor or Array) containing dictionaries and + returns a stringified version using python join + """ + return joinable.join([json.dumps(doc, default=json_util.default) for doc in iter]) + + def transform(self, docs): + """ + Processes pyMongo cursor and returns single array with each element being + a JSON serializable dictionary + MongoToGCSOperator.transform() assumes no processing is needed + ie. docs is a pyMongo cursor of documents and cursor just needs to be + converted into an array. + """ + return [doc for doc in docs]