This feature is in beta phase and may still experience issues
Introduction
Object storage (S3) can be configured to send notifications when objects are created or deleted. The notifications can be sent to
- HTTP endpoint
- Kafka
- AMQP
This documentation gives short summary of the functionality with minimalistic python examples. For the full functionality, the user is advised to consult Ceph documentation. While we recommend using python and boto3 library, notifications can be also configured using e.g. curl or AWS cli.
In order to configure notifications, one must
- Create topic to define the endpoint where notifications are sent
- Set policy for the topic (optional, by default notifications are public)
- Set up notifications for desired bucket and event
Create topic
Topic can be created following:
#!/usr/bin/python3 import boto3 access_key_id = 'xx' # stored in Morpheus Cypher secret_access_key = 'xx' # stored in Morpheus Cypher ceph_endpoint = '<ceph endpoint>' region_name = 'default' # required by boto3, any value works endpoint = "http://vm-name.tenancy-name.s/f.ewcloud.host" # your own http endpoint topic_name = 'example-topic' client = boto3.client('sns', endpoint_url= ceph_endpoint, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key, region_name=region_name, config=Config(signature_version='s3')) client.create_topic(Name=topic_name, Attributes={'persistent': 'True', 'push-endpoint': endpoint})
The <ceph endpoint> shall be replaced with the proper one :
- The EUMETSAT endpoint is https://s3.waw3-1.cloudferro.com.
- The ECMWF endpoint are (depending on which cloud cluster is linked to the tenancy - this can be checked in Morpheus under Infrastructure->Clouds):
- CCI1 cluster : https://object-store.os-api.cci1.ecmwf.int
- CCI2 cluster : https://object-store.os-api.cci2.ecmwf.int
Push endpoint can get values in forms
- http[s]://<fqdn>[:<port]
- amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]
- kafka://[<user>:<password>@]<fqdn>[:<port]
See How to create S3 buckets in Morpheus - European Weather Cloud Knowledge Base - ECMWF Confluence Wiki for available ceph_endpoints.
Note that topic names are public
Set up notifications
Notifications can be configured following:
#!/usr/bin/python3 import boto3 access_key_id='xx' # from Morpheus Cypher secret_access_key='xx' # from Morpheus Cypher ceph_endpoint = "<ceph endpoint>" region = 'default' # required by boto3, any value works bucket_name = "<bucket name>" s3 = boto3.client('s3', endpoint_url=ceph_endpoint , aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) # Create notification configuration response = s3.put_bucket_notification_configuration( Bucket=bucket_name, NotificationConfiguration={ 'TopicConfigurations': [ { 'Id': 'test-http-topic', 'TopicArn': 'arn:aws:sns:default::example-topic', 'Events': ['s3:ObjectCreated:*'] } ] } )
The <ceph endpoint> shall be replaced with the proper one :
- The EUMETSAT endpoint is https://s3.waw3-1.cloudferro.com.
- The ECMWF endpoint are (depending on which cloud cluster is linked to the tenancy - this can be checked in Morpheus under Infrastructure->Clouds):
- CCI1 cluster : https://object-store.os-api.cci1.ecmwf.int
- CCI2 cluster : https://object-store.os-api.cci2.ecmwf.int
TopicArn follows pattern arn:aws:sns:{region}::{topic-name}
Most typical event types are:
- s3:ObjectCreated:*
- s3:ObjectRemoved:*
Please consult S3 Bucket Notifications Compatibility — Ceph Documentation for details.
Note, that one can also add filters for objects, e.g.:
#!/usr/bin/python3 import boto3 access_key_id='xx' # from Morpheus Cypher secret_access_key='xx' # from Morpheus Cypher ceph_endpoint = "<ceph endpoint>" region = 'default' # required by boto3, any value works bucket_name = "<bucket name>" s3 = boto3.client('s3', endpoint_url=ceph_endpoint , aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) # Create notification configuration response = s3.put_bucket_notification_configuration( Bucket=bucket_name, NotificationConfiguration={ 'TopicConfigurations': [ { 'Id': 'test-http-topic', 'TopicArn': 'arn:aws:sns:default::example-topic', 'Events': ['s3:ObjectCreated:*'], 'Filter': { 'Key': { 'FilterRules': [ { 'Name': 'suffix', 'Value': 'reflectivity-composite-opera.h5' } ] } } } ] } )
Note: the "Filter" section with the suffix value example " reflectivity-composite-opera.h5" is optional and could be excluded from the request payload.
See Bucket Operations — Ceph Documentation for more detailed documentation.
List topics
Available topics can be listed following:
#!/usr/bin/python3 import boto3 access_key_id = 'xx' # stored in Morpheus Cypher secret_access_key = 'xx' # stored in Morpheus Cypher ceph_endpoint = '<ceph endpoint>' region_name = 'default' # required by boto3, any value works sns = boto3.client('sns', region_name=region_name, endpoint_url= ceph_endpoint, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key, config=Config(signature_version='s3')) response = sns.list_topics() # Print the Topic ARNs print('All topics:') for topic in response['Topics']: print(' -'+topic['TopicArn'])
Delete topics
Topics can be deleted following:
#!/usr/bin/python3 import boto3 access_key_id='xx' # from Morpheus Cypher secret_access_key='xx' # from Morpheus Cypher ceph_endpoint = "<ceph endpoint>" region = 'default' # required by boto3, any value works arn = 'arn:aws:sns:default::example-topic' sns = boto3.client('sns', region_name=region, endpoint_url=ceph_endpoint , aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) # Delete the SNS topic response = sns.delete_topic(TopicArn=arn)