You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

This feature is in beta phase and may still experience issues

Introduction

Object storage (S3) can be configured to send notifications when objects are created or deleted. The notifications can be sent to

  1. HTTP endpoint
  2. Kafka
  3. AMQP

This documentation gives short summary of the functionality with minimalistic python examples. For the full functionality, the user is advised to consult Ceph documentation. While we recommend using python and boto3 library, notifications can be also configured using e.g. curl or AWS cli.    

In order to configure notifications, one must

  1. Create topic to define the endpoint where notifications are sent
  2. Set policy for the topic (optional, by default notifications are public)
  3. Set up notifications for desired bucket and event

Create topic

Topic can be created following: 

#!/usr/bin/python3

import boto3

access_key_id = 'xx' # stored in Cypher
secret_access_key = 'xx' # stored in Cypher
ceph_endpoint = '<ceph endpoint>'
region_name = 'default' # required by boto3, any value works
endpoint = "http://vm-name.tenancy-name.s/f.ewcloud.host" # your own http endpoint
topic_name = 'example-topic'

client = boto3.client('sns',
                     endpoint_url= ceph_endpoint,
                     aws_access_key_id=access_key_id,
                     aws_secret_access_key=secret_access_key,
                     region_name=region_name,
                     config=Config(signature_version='s3'))

client.create_topic(Name=topic_name, Attributes={'persistent': 'True', 'push-endpoint': endpoint})

The <ceph endpoint> shall be replaced with the proper one :

  1. The EUMETSAT endpoint is https://s3.waw3-1.cloudferro.com.
  2. The ECMWF endpoint are (depending on which cloud cluster is linked to the tenancy - this can be checked under Infrastructure->Clouds):


Push endpoint can get values in forms

  • http[s]://<fqdn>[:<port]
  • amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]
  • kafka://[<user>:<password>@]<fqdn>[:<port]

See How to create S3 buckets in Morpheus - European Weather Cloud Knowledge Base - ECMWF Confluence Wiki for available ceph_endpoints.

Note that topic names are public

Set up notifications

Notifications can be configured following:

#!/usr/bin/python3
import boto3

access_key_id='xx' # from Cypher
secret_access_key='xx'  # from Cypher 
ceph_endpoint = "<ceph endpoint>"
region = 'default'  # required by boto3, any value works 

bucket_name = "<bucket name>"

s3 = boto3.client('s3',
                endpoint_url=ceph_endpoint ,
                aws_access_key_id=access_key_id,
                aws_secret_access_key=secret_access_key)

# Create notification configuration
response = s3.put_bucket_notification_configuration(
    Bucket=bucket_name,
    NotificationConfiguration={
        'TopicConfigurations': [
            {
                'Id': 'test-http-topic',
                'TopicArn': 'arn:aws:sns:default::example-topic',
                'Events': ['s3:ObjectCreated:*']
            }
        ]
    }
)

The <ceph endpoint> shall be replaced with the proper one :

  1. The EUMETSAT endpoint is https://s3.waw3-1.cloudferro.com.
  2. The ECMWF endpoint are (depending on which cloud cluster is linked to the tenancy - this can be checked under Infrastructure->Clouds):



TopicArn follows pattern arn:aws:sns:{region}::{topic-name}

Most typical event types are:

  • s3:ObjectCreated:*
  • s3:ObjectRemoved:*

Please consult S3 Bucket Notifications Compatibility — Ceph Documentation for details.

Note, that one can also add filters for objects, e.g.: 

#!/usr/bin/python3
import boto3

access_key_id='xx' # from Cypher
secret_access_key='xx'  # from Cypher 
ceph_endpoint = "<ceph endpoint>"
region = 'default'  # required by boto3, any value works 

bucket_name = "<bucket name>"

s3 = boto3.client('s3',
                endpoint_url=ceph_endpoint ,
                aws_access_key_id=access_key_id,
                aws_secret_access_key=secret_access_key)

# Create notification configuration
response = s3.put_bucket_notification_configuration(
    Bucket=bucket_name,
    NotificationConfiguration={
        'TopicConfigurations': [
            {
                'Id': 'test-http-topic',
                'TopicArn': 'arn:aws:sns:default::example-topic',
                'Events': ['s3:ObjectCreated:*'],
                'Filter': {
                    'Key': {
                        'FilterRules': [
                            {
                                'Name': 'suffix',
                                'Value': 'reflectivity-composite-opera.h5'
                            }
                        ]
                    }
                }             
            }
        ]
    }
)

Note: the "Filter" section with the suffix value example " reflectivity-composite-opera.h5" is optional and could be excluded from the request payload.


See Bucket Operations — Ceph Documentation for more detailed documentation.

List topics

Available topics can be listed following:

#!/usr/bin/python3

import boto3

access_key_id = 'xx' # stored in Cypher
secret_access_key = 'xx' # stored in Cypher
ceph_endpoint = '<ceph endpoint>'
region_name = 'default' # required by boto3, any value works

sns = boto3.client('sns',
  region_name=region_name,
  endpoint_url= ceph_endpoint,
  aws_access_key_id=access_key_id,
  aws_secret_access_key=secret_access_key,
  config=Config(signature_version='s3'))

response = sns.list_topics()

# Print the Topic ARNs
print('All topics:')
for topic in response['Topics']:
  print(' -'+topic['TopicArn'])

Delete topics

Topics can be deleted following:

#!/usr/bin/python3
import boto3

access_key_id='xx' # from Cypher
secret_access_key='xx'  # from Cypher 
ceph_endpoint = "<ceph endpoint>"
region = 'default'  # required by boto3, any value works 

arn = 'arn:aws:sns:default::example-topic'

sns = boto3.client('sns',
                   region_name=region,
                   endpoint_url=ceph_endpoint ,
                   aws_access_key_id=access_key_id,
                   aws_secret_access_key=secret_access_key)

# Delete the SNS topic
response = sns.delete_topic(TopicArn=arn)




  • No labels