Skip to content
Snippets Groups Projects

python wrapper for stacapi requests

  • Clone with SSH
  • Clone with HTTPS
  • Embed
  • Share
    The snippet can be accessed without any authentication.
    Authored by Fabian Wachsmann

    python wrapper for stacapi requests

    Edited
    requests_to_functions.py 2.21 KiB
    import requests
    base_url="http://stac2.cloud.dkrz.de:8081"
    
    def api_delete_collection(cid):
        resp=requests.delete(f"{base_url}/collections/{cid}")
        if resp.status_code not in range(200,209):
            print(f"Status code: {resp.status_code} Cid {cid}")
            raise ValueError(
                f"Error deleting {cid} collection. Message: {resp.text}"
            )
    def api_delete_item(cid,iid):
        resp=requests.delete(f"{base_url}/collections/{cid}/items/{iid}")
        if resp.status_code not in range(200,209):
            print(f"Status code: {resp.status_code} Cid {cid}")
            raise ValueError(
                f"Error deleting {iid} item from collection {cid}. Message: {resp.text}"
            )
    def api_update_collection(json,cid):
        resp=requests.put(f"{base_url}/collections/{cid}", json=json)
        if resp.status_code not in range(200,209):
            print(f"Status code: {resp.status_code} Cid {cid}")
            raise ValueError(
                f"Error updating {cid} collection. Message: {resp.text}"
            )
    def api_update_item(json,cid,iid):
        resp=requests.put(f"{base_url}/collections/{cid}/items/{iid}", json=json)
        if resp.status_code not in range(200,209):
            print(f"Status code: {resp.status_code} Cid {cid}")
            raise ValueError(
                f"Error updating item {iid} from {cid} collection. Message: {resp.text}"
            )
    def api_create_collection(json):
        resp=requests.post(f"{base_url}/collections", json=json)
        if resp.status_code not in range(200,209):
            #print(f"Status code: {resp.status_code} Cid {cid}")
            raise ValueError(
                f"Error ingesting collection. Message: {resp.text}"
            )
            
    def api_create_item(json,cid):
        resp=requests.post(f"{base_url}/collections/{cid}/items", json=json)
        if resp.status_code not in range(200,209):
            #print(f"Status code: {resp.status_code} Cid {cid}")
            raise ValueError(
                f"Error ingesting item for collection {cid}. Message: {resp.text}"
            )
    def api_create_or_update_collection(json,cid):
        try:
            api_create_collection(json)
        except:
            api_update_collection(json,cid)
    
    def api_create_or_update_item(json,cid,iid):
        try:
            api_create_item(json,cid)
        except:
            api_update_item(json,cid,iid)
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment