Difference between revisions of "Pager Duty API Python"
Jump to navigation
Jump to search
(2 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
https://github.com/PagerDuty/pdpyras | https://github.com/PagerDuty/pdpyras | ||
+ | |||
+ | https://github.com/PagerDuty/API_Python_Examples/blob/master/REST_API_v2/Incidents/trigger_incident.py | ||
Line 39: | Line 41: | ||
], | ], | ||
) | ) | ||
+ | ``` | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | Raw Requests | ||
+ | ``` | ||
+ | import json | ||
+ | import os | ||
+ | import requests | ||
+ | import uuid | ||
+ | |||
+ | PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID'] | ||
+ | PD_API_TOKEN = os.environ['PD_API_TOKEN'] | ||
+ | PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM'] | ||
+ | PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID'] | ||
+ | PD_DEFAULT_INCIDENT_KEY = os.environ['PD_DEFAULT_INCIDENT_KEY'] | ||
+ | PD_URL = "https://api.pagerduty.com/" | ||
+ | PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USER'] | ||
+ | |||
+ | |||
+ | |||
+ | headers = { | ||
+ | 'Content-Type': 'application/json', | ||
+ | 'Accept': 'application/vnd.pagerduty+json;version=2', | ||
+ | 'Authorization': 'Token token={token}'.format(token=PD_API_TOKEN), | ||
+ | 'From': PD_DEFAULT_FROM | ||
+ | } | ||
+ | |||
+ | |||
+ | def main(): | ||
+ | # create_incident("test 1", "my details") | ||
+ | # update_incident(183083, "resolved") | ||
+ | update_incident('K172XEHMR1106X', "resolved") | ||
+ | |||
+ | |||
+ | def update_incident(id, status): | ||
+ | url = PD_URL + f"incidents/{id}" | ||
+ | escalation_level = 1 | ||
+ | assigned_to_user = '' | ||
+ | escalation_policy = '' | ||
+ | type = 'incident' | ||
+ | summary = 'Resolved' | ||
+ | payload = { | ||
+ | 'incident': { | ||
+ | 'type': type, | ||
+ | 'summary': summary, | ||
+ | 'status': status, | ||
+ | 'escalation_level': escalation_level, | ||
+ | 'assigned_to_user': assigned_to_user, | ||
+ | 'escalation_policy': escalation_policy | ||
+ | } | ||
+ | } | ||
+ | r = requests.put(url, headers=headers, data=json.dumps(payload)) | ||
+ | print('Status Code: {code}'.format(code=r.status_code)) | ||
+ | print("Reponse JSON:", r.json()) | ||
+ | |||
+ | |||
+ | def create_incident(title, details, service_id=PD_DEFAULT_INCIDENT_KEY): | ||
+ | url = PD_URL + "incidents" | ||
+ | """Triggers an incident via the V2 REST API using sample data.""" | ||
+ | incident_key = str(uuid.uuid4()) | ||
+ | |||
+ | |||
+ | payload = { | ||
+ | "incident": { | ||
+ | "type": "incident", | ||
+ | "title": title, | ||
+ | "service": { | ||
+ | "id": PD_DEFAULT_SERVICE_ID, | ||
+ | "type": "service_reference" | ||
+ | }, | ||
+ | "incident_key": incident_key, | ||
+ | "body": { | ||
+ | "type": "incident_body", | ||
+ | "details": details | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | r = requests.post(url, headers=headers, data=json.dumps(payload)) | ||
+ | incident_id = r.json()['incident']['id'] | ||
+ | |||
+ | # print('Status Code: {code}'.format(code=r.status_code)) | ||
+ | # print("Reponse JSON:", r.json()) | ||
+ | # print("Reponse JSON:", r.json()['incident']['id']) | ||
+ | code=r.status_code | ||
+ | return code, incident_id | ||
+ | |||
+ | |||
+ | |||
+ | if __name__ == '__main__': | ||
+ | # create_incident() | ||
+ | main() | ||
+ | ``` | ||
+ | |||
+ | |||
+ | # SDK Examples | ||
+ | |||
+ | ``` | ||
+ | import os | ||
+ | from pdpyras import APISession | ||
+ | import sys | ||
+ | import ast | ||
+ | import json | ||
+ | |||
+ | API_TOKEN = os.environ['PD_API_TOKEN'] | ||
+ | PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM'] | ||
+ | PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USERS'] | ||
+ | session = APISession(API_TOKEN, default_from=PD_DEFAULT_FROM) | ||
+ | if not isinstance(PD_DEFAULT_USERS, list): | ||
+ | PD_DEFAULT_USERS = ast.literal_eval(PD_DEFAULT_USERS) | ||
+ | users = PD_DEFAULT_USERS | ||
+ | incidents = session.list_all( | ||
+ | 'incidents', | ||
+ | # params={'user_ids[]':users,'statuses[]':['triggered']} | ||
+ | params={'user_ids[]':users,'statuses[]':['acknowledged']} | ||
+ | ) | ||
+ | |||
+ | # Change their state | ||
+ | for i in incidents: | ||
+ | print(i) | ||
+ | i['status'] = 'resolved' | ||
+ | # i['status'] = 'acknowledged' | ||
+ | |||
+ | # PUT the updated list back up to the API | ||
+ | updated_incidents = session.rput('incidents', json=incidents) | ||
``` | ``` |
Latest revision as of 01:06, 8 March 2022
https://github.com/PagerDuty/pdpyras
def resolve_incident(self, device_id, message): url = "https://events.pagerduty.com/v2/enqueue" print(url) headers = { "Accept": "application/vnd.pagerduty+json;version=2", "Authorization": "Token token={}".format(self.pagerDuty_api_key), "From": "xys@assa.com", "Content-Type": "application/json" } payload = { "routing_key": self.pagerDuty_integration_key, "dedup_key": device_id, "event_action": "acknowledge" } r = requests.post(url, data=json.dumps(payload), headers=headers) if r.status_code == 200: print(r.json) return True else: print(r.status_code) print(r.text) return False
session.rput( "incidents", json=[ {'id':'PABC123','type':'incident_reference', 'status':'resolved'}, {'id':'PDEF456','type':'incident_reference', 'status':'resolved'}, ], )
Raw Requests
import json import os import requests import uuid PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID'] PD_API_TOKEN = os.environ['PD_API_TOKEN'] PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM'] PD_DEFAULT_SERVICE_ID = os.environ['PD_DEFAULT_SERVICE_ID'] PD_DEFAULT_INCIDENT_KEY = os.environ['PD_DEFAULT_INCIDENT_KEY'] PD_URL = "https://api.pagerduty.com/" PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USER'] headers = { 'Content-Type': 'application/json', 'Accept': 'application/vnd.pagerduty+json;version=2', 'Authorization': 'Token token={token}'.format(token=PD_API_TOKEN), 'From': PD_DEFAULT_FROM } def main(): # create_incident("test 1", "my details") # update_incident(183083, "resolved") update_incident('K172XEHMR1106X', "resolved") def update_incident(id, status): url = PD_URL + f"incidents/{id}" escalation_level = 1 assigned_to_user = '' escalation_policy = '' type = 'incident' summary = 'Resolved' payload = { 'incident': { 'type': type, 'summary': summary, 'status': status, 'escalation_level': escalation_level, 'assigned_to_user': assigned_to_user, 'escalation_policy': escalation_policy } } r = requests.put(url, headers=headers, data=json.dumps(payload)) print('Status Code: {code}'.format(code=r.status_code)) print("Reponse JSON:", r.json()) def create_incident(title, details, service_id=PD_DEFAULT_INCIDENT_KEY): url = PD_URL + "incidents" """Triggers an incident via the V2 REST API using sample data.""" incident_key = str(uuid.uuid4()) payload = { "incident": { "type": "incident", "title": title, "service": { "id": PD_DEFAULT_SERVICE_ID, "type": "service_reference" }, "incident_key": incident_key, "body": { "type": "incident_body", "details": details } } } r = requests.post(url, headers=headers, data=json.dumps(payload)) incident_id = r.json()['incident']['id'] # print('Status Code: {code}'.format(code=r.status_code)) # print("Reponse JSON:", r.json()) # print("Reponse JSON:", r.json()['incident']['id']) code=r.status_code return code, incident_id if __name__ == '__main__': # create_incident() main()
SDK Examples
import os from pdpyras import APISession import sys import ast import json API_TOKEN = os.environ['PD_API_TOKEN'] PD_DEFAULT_FROM = os.environ['PD_DEFAULT_FROM'] PD_DEFAULT_USERS = os.environ['PD_DEFAULT_USERS'] session = APISession(API_TOKEN, default_from=PD_DEFAULT_FROM) if not isinstance(PD_DEFAULT_USERS, list): PD_DEFAULT_USERS = ast.literal_eval(PD_DEFAULT_USERS) users = PD_DEFAULT_USERS incidents = session.list_all( 'incidents', # params={'user_ids[]':users,'statuses[]':['triggered']} params={'user_ids[]':users,'statuses[]':['acknowledged']} ) # Change their state for i in incidents: print(i) i['status'] = 'resolved' # i['status'] = 'acknowledged' # PUT the updated list back up to the API updated_incidents = session.rput('incidents', json=incidents)