Difference between revisions of "Pager Duty API Python"

From UVOO Tech Wiki
Jump to navigation Jump to search
 
(2 intermediate revisions by the same user not shown)
Line 1: Line 1:
 
https://github.com/PagerDuty/pdpyras
 
https://github.com/PagerDuty/pdpyras
 +
 +
https://github.com/PagerDuty/API_Python_Examples/blob/master/REST_API_v2/Incidents/trigger_incident.py
  
  
Line 39: Line 41:
 
     ],
 
     ],
 
)
 
)
 +
```
 +
 +
 +
 +
 +
Raw Requests
 +
```
 +
import json
 +
import os
 +
import requests
 +
import uuid
 +
 +
PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID']
 +
PD_API_TOKEN = os.environ['PD_API_TOKEN']
 +
PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM']
 +
PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID']
 +
PD_DEFAULT_INCIDENT_KEY = os.environ['PD_DEFAULT_INCIDENT_KEY']
 +
PD_URL = "https://api.pagerduty.com/"
 +
PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USER']
 +
 +
 +
 +
headers = {
 +
    'Content-Type': 'application/json',
 +
    'Accept': 'application/vnd.pagerduty+json;version=2',
 +
    'Authorization': 'Token token={token}'.format(token=PD_API_TOKEN),
 +
    'From': PD_DEFAULT_FROM
 +
}
 +
 +
 +
def main():
 +
    # create_incident("test 1", "my details")
 +
    # update_incident(183083, "resolved")
 +
    update_incident('K172XEHMR1106X', "resolved")
 +
 +
 +
def update_incident(id, status):
 +
    url = PD_URL + f"incidents/{id}"
 +
    escalation_level = 1
 +
    assigned_to_user = ''
 +
    escalation_policy = ''
 +
    type = 'incident'
 +
    summary = 'Resolved'
 +
    payload = {
 +
        'incident': {
 +
            'type': type,
 +
            'summary': summary,
 +
            'status': status,
 +
            'escalation_level': escalation_level,
 +
            'assigned_to_user': assigned_to_user,
 +
            'escalation_policy': escalation_policy
 +
        }
 +
    }
 +
    r = requests.put(url, headers=headers, data=json.dumps(payload))
 +
    print('Status Code: {code}'.format(code=r.status_code))
 +
    print("Reponse JSON:", r.json())
 +
 +
 +
def create_incident(title, details, service_id=PD_DEFAULT_INCIDENT_KEY):
 +
    url = PD_URL + "incidents"
 +
    """Triggers an incident via the V2 REST API using sample data."""
 +
    incident_key = str(uuid.uuid4())
 +
 +
 +
    payload = {
 +
        "incident": {
 +
            "type": "incident",
 +
            "title": title,
 +
            "service": {
 +
                "id": PD_DEFAULT_SERVICE_ID,
 +
                "type": "service_reference"
 +
            },
 +
            "incident_key": incident_key,
 +
            "body": {
 +
                "type": "incident_body",
 +
                "details": details
 +
            }
 +
          }
 +
        }
 +
 +
    r = requests.post(url, headers=headers, data=json.dumps(payload))
 +
    incident_id = r.json()['incident']['id']
 +
 +
    # print('Status Code: {code}'.format(code=r.status_code))
 +
    # print("Reponse JSON:", r.json())
 +
    # print("Reponse JSON:", r.json()['incident']['id'])
 +
    code=r.status_code
 +
    return code, incident_id
 +
 +
 +
 +
if __name__ == '__main__':
 +
    # create_incident()
 +
    main()
 +
```
 +
 +
 +
# SDK Examples
 +
 +
```
 +
import os
 +
from pdpyras import APISession
 +
import sys
 +
import ast
 +
import json
 +
 +
API_TOKEN = os.environ['PD_API_TOKEN']
 +
PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM']
 +
PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USERS']
 +
session = APISession(API_TOKEN, default_from=PD_DEFAULT_FROM)
 +
if not isinstance(PD_DEFAULT_USERS, list):
 +
    PD_DEFAULT_USERS = ast.literal_eval(PD_DEFAULT_USERS)
 +
users = PD_DEFAULT_USERS
 +
incidents = session.list_all(
 +
    'incidents',
 +
    # params={'user_ids[]':users,'statuses[]':['triggered']}
 +
    params={'user_ids[]':users,'statuses[]':['acknowledged']}
 +
)
 +
 +
# Change their state
 +
for i in incidents:
 +
    print(i)
 +
    i['status'] = 'resolved'
 +
    # i['status'] = 'acknowledged'
 +
 +
# PUT the updated list back up to the API
 +
updated_incidents = session.rput('incidents', json=incidents)
 
```
 
```

Latest revision as of 01:06, 8 March 2022

https://github.com/PagerDuty/pdpyras

https://github.com/PagerDuty/API_Python_Examples/blob/master/REST_API_v2/Incidents/trigger_incident.py

https://community.pagerduty.com/forum/t/resolving-an-incident-using-pagerduty-events-v2-or-incident-api/1880

def resolve_incident(self, device_id, message):
    url = "https://events.pagerduty.com/v2/enqueue"
    print(url)

    headers = {
        "Accept": "application/vnd.pagerduty+json;version=2",
        "Authorization": "Token token={}".format(self.pagerDuty_api_key),
        "From": "xys@assa.com",
        "Content-Type": "application/json"
    }
    payload = {
        "routing_key": self.pagerDuty_integration_key,
        "dedup_key": device_id,
        "event_action": "acknowledge"
        }

    r = requests.post(url, data=json.dumps(payload), headers=headers)
    if r.status_code == 200:
        print(r.json)
        return True
    else:
        print(r.status_code)
        print(r.text)
        return False
session.rput(
    "incidents",
    json=[
      {'id':'PABC123','type':'incident_reference', 'status':'resolved'},
      {'id':'PDEF456','type':'incident_reference', 'status':'resolved'},
    ],
)

Raw Requests

import json
import os
import requests
import uuid

PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID']
PD_API_TOKEN = os.environ['PD_API_TOKEN']
PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM']
PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID']
PD_DEFAULT_INCIDENT_KEY = os.environ['PD_DEFAULT_INCIDENT_KEY']
PD_URL = "https://api.pagerduty.com/"
PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USER']



headers = {
    'Content-Type': 'application/json',
    'Accept': 'application/vnd.pagerduty+json;version=2',
    'Authorization': 'Token token={token}'.format(token=PD_API_TOKEN),
    'From': PD_DEFAULT_FROM
}


def main():
    # create_incident("test 1", "my details")
    # update_incident(183083, "resolved")
    update_incident('K172XEHMR1106X', "resolved")


def update_incident(id, status):
    url = PD_URL + f"incidents/{id}"
    escalation_level = 1
    assigned_to_user = ''
    escalation_policy = ''
    type = 'incident'
    summary = 'Resolved'
    payload = {
        'incident': {
            'type': type,
            'summary': summary,
            'status': status,
            'escalation_level': escalation_level,
            'assigned_to_user': assigned_to_user,
            'escalation_policy': escalation_policy
        }
    }
    r = requests.put(url, headers=headers, data=json.dumps(payload))
    print('Status Code: {code}'.format(code=r.status_code))
    print("Reponse JSON:", r.json())


def create_incident(title, details, service_id=PD_DEFAULT_INCIDENT_KEY):
    url = PD_URL + "incidents"
    """Triggers an incident via the V2 REST API using sample data."""
    incident_key = str(uuid.uuid4())


    payload = {
        "incident": {
            "type": "incident",
            "title": title,
            "service": {
                "id": PD_DEFAULT_SERVICE_ID,
                "type": "service_reference"
            },
            "incident_key": incident_key,
            "body": {
                "type": "incident_body",
                "details": details
            }
          }
        }

    r = requests.post(url, headers=headers, data=json.dumps(payload))
    incident_id = r.json()['incident']['id']

    # print('Status Code: {code}'.format(code=r.status_code))
    # print("Reponse JSON:", r.json())
    # print("Reponse JSON:", r.json()['incident']['id'])
    code=r.status_code
    return code, incident_id



if __name__ == '__main__':
    # create_incident()
    main()

SDK Examples

import os
from pdpyras import APISession
import sys
import ast
import json

API_TOKEN = os.environ['PD_API_TOKEN']
PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM']
PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USERS']
session = APISession(API_TOKEN, default_from=PD_DEFAULT_FROM)
if not isinstance(PD_DEFAULT_USERS, list):
    PD_DEFAULT_USERS = ast.literal_eval(PD_DEFAULT_USERS)
users = PD_DEFAULT_USERS
incidents = session.list_all(
    'incidents',
    # params={'user_ids[]':users,'statuses[]':['triggered']}
    params={'user_ids[]':users,'statuses[]':['acknowledged']}
)

# Change their state
for i in incidents:
    print(i)
    i['status'] = 'resolved'
    # i['status'] = 'acknowledged'

# PUT the updated list back up to the API
updated_incidents = session.rput('incidents', json=incidents)