Pager Duty API Python
Jump to navigation
Jump to search
https://github.com/PagerDuty/pdpyras
def resolve_incident(self, device_id, message):
url = "https://events.pagerduty.com/v2/enqueue"
print(url)
headers = {
"Accept": "application/vnd.pagerduty+json;version=2",
"Authorization": "Token token={}".format(self.pagerDuty_api_key),
"From": "xys@assa.com",
"Content-Type": "application/json"
}
payload = {
"routing_key": self.pagerDuty_integration_key,
"dedup_key": device_id,
"event_action": "acknowledge"
}
r = requests.post(url, data=json.dumps(payload), headers=headers)
if r.status_code == 200:
print(r.json)
return True
else:
print(r.status_code)
print(r.text)
return False
session.rput(
"incidents",
json=[
{'id':'PABC123','type':'incident_reference', 'status':'resolved'},
{'id':'PDEF456','type':'incident_reference', 'status':'resolved'},
],
)
Raw Requests
import json
import os
import requests
import uuid
PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID']
PD_API_TOKEN = os.environ['PD_API_TOKEN']
PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM']
PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID']
PD_DEFAULT_INCIDENT_KEY = os.environ['PD_DEFAULT_INCIDENT_KEY']
PD_URL = "https://api.pagerduty.com/"
PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USER']
headers = {
'Content-Type': 'application/json',
'Accept': 'application/vnd.pagerduty+json;version=2',
'Authorization': 'Token token={token}'.format(token=PD_API_TOKEN),
'From': PD_DEFAULT_FROM
}
def main():
# create_incident("test 1", "my details")
# update_incident(183083, "resolved")
update_incident('K172XEHMR1106X', "resolved")
def update_incident(id, status):
url = PD_URL + f"incidents/{id}"
escalation_level = 1
assigned_to_user = ''
escalation_policy = ''
type = 'incident'
summary = 'Resolved'
payload = {
'incident': {
'type': type,
'summary': summary,
'status': status,
'escalation_level': escalation_level,
'assigned_to_user': assigned_to_user,
'escalation_policy': escalation_policy
}
}
r = requests.put(url, headers=headers, data=json.dumps(payload))
print('Status Code: {code}'.format(code=r.status_code))
print("Reponse JSON:", r.json())
def create_incident(title, details, service_id=PD_DEFAULT_INCIDENT_KEY):
url = PD_URL + "incidents"
"""Triggers an incident via the V2 REST API using sample data."""
incident_key = str(uuid.uuid4())
payload = {
"incident": {
"type": "incident",
"title": title,
"service": {
"id": PD_DEFAULT_SERVICE_ID,
"type": "service_reference"
},
"incident_key": incident_key,
"body": {
"type": "incident_body",
"details": details
}
}
}
r = requests.post(url, headers=headers, data=json.dumps(payload))
incident_id = r.json()['incident']['id']
# print('Status Code: {code}'.format(code=r.status_code))
# print("Reponse JSON:", r.json())
# print("Reponse JSON:", r.json()['incident']['id'])
code=r.status_code
return code, incident_id
if __name__ == '__main__':
# create_incident()
main()
SDK Examples
import os
from pdpyras import APISession
import sys
import ast
import json
API_TOKEN = os.environ['PD_API_TOKEN']
PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM']
PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USERS']
session = APISession(API_TOKEN, default_from=PD_DEFAULT_FROM)
if not isinstance(PD_DEFAULT_USERS, list):
PD_DEFAULT_USERS = ast.literal_eval(PD_DEFAULT_USERS)
users = PD_DEFAULT_USERS
incidents = session.list_all(
'incidents',
# params={'user_ids[]':users,'statuses[]':['triggered']}
params={'user_ids[]':users,'statuses[]':['acknowledged']}
)
# Change their state
for i in incidents:
print(i)
i['status'] = 'resolved'
# i['status'] = 'acknowledged'
# PUT the updated list back up to the API
updated_incidents = session.rput('incidents', json=incidents)