דלג לתוכן הראשי

ניהול משאבי מחשוב ונתונים ב-Qiskit Serverless

גרסאות חבילות

הקוד בדף זה פותח באמצעות הדרישות הבאות. אנו ממליצים להשתמש בגרסאות אלו או בגרסאות חדשות יותר.

qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0

עם Qiskit Serverless, תוכל לנהל מחשוב ונתונים לאורך תבנית Qiskit שלך, כולל מעבדי CPU, מעבדי QPU ומאיצי מחשוב אחרים.

הגדרת סטטוסים מפורטים

לעומסי עבודה של Serverless יש מספר שלבים לאורך תהליך עבודה. כברירת מחדל, הסטטוסים הבאים ניתנים לצפייה עם job.status():

  • QUEUED: עומס העבודה נמצא בתור למשאבים קלאסיים
  • INITIALIZING: עומס העבודה מוגדר
  • RUNNING: עומס העבודה רץ כרגע על משאבים קלאסיים
  • DONE: עומס העבודה הושלם בהצלחה

תוכל גם להגדיר סטטוסים מותאמים אישית המתארים בפירוט רב יותר את שלב תהליך העבודה הספציפי, כדלקמן.

# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path

Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py

from qiskit_serverless import update_status, Job

## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)

## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)

## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)

## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)

## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py

לאחר השלמה מוצלחת של עומס עבודה זה (עם save_result()), סטטוס זה יתעדכן אוטומטית ל-DONE.

תהליכי עבודה מקביליים

למשימות קלאסיות שניתן לבצע במקביל, השתמש בדקורטור @distribute_task להגדרת דרישות המחשוב הנדרשות לביצוע משימה. התחל בהיזכרות בדוגמת transpile_remote.py מנושא כתיבת תוכנית Qiskit Serverless הראשונה שלך עם הקוד הבא.

הקוד הבא דורש שכבר שמרת את האישורים שלך.

%%writefile ./source_files/transpile_remote.py

from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task

service = QiskitRuntimeService()

@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py

בדוגמה זו, קישטת את הפונקציה transpile_remote() עם @distribute_task(target={"cpu": 1}). בעת הרצה, פעולה זו יוצרת משימת worker מקבילית אסינכרונית עם ליבת CPU יחידה, ומחזירה עם הפניה למעקב אחר ה-worker. כדי לאחזר את התוצאה, העבר את ההפניה לפונקציה get(). נוכל להשתמש בזה כדי להריץ מספר משימות מקביליות:

%%writefile --append ./source_files/transpile_remote.py

from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job

# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation

update_status(Job.OPTIMIZING_HARDWARE)

start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]

transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata

result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}

save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.

def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid

title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")

test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

חקירת תצורות משימות שונות

תוכל להקצות באופן גמיש CPU, GPU וזיכרון למשימות שלך באמצעות @distribute_task(). עבור Qiskit Serverless על IBM Quantum® Platform, כל תוכנית מצוידת ב-16 ליבות CPU ו-32 GB RAM, שניתן להקצות באופן דינמי לפי הצורך.

ניתן להקצות ליבות CPU כליבות CPU מלאות, או אפילו הקצאות חלקיות, כפי שמוצג בהמשך.

זיכרון מוקצה במספר בתים. זכור שיש 1024 בתים בקילובייט, 1024 קילובייטים במגהבייט, ו-1024 מגהבייטים בגיגהבייט. כדי להקצות 2 GB זיכרון ל-worker שלך, עליך להקצות "mem": 2 * 1024 * 1024 * 1024.

%%writefile --append ./source_files/transpile_remote.py

@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

ניהול נתונים לאורך התוכנית שלך

Qiskit Serverless מאפשרת לך לנהל קבצים בספריית /data לאורך כל התוכניות שלך. זה כולל מספר מגבלות:

  • רק קבצי tar ו-h5 נתמכים כיום
  • זה רק אחסון /data שטוח, ואין תמיכה בתת-ספריות /data/folder/

הקוד הבא מציג כיצד להעלות קבצים. ודא שאמת את עצמך ל-Qiskit Serverless עם חשבון IBM Quantum שלך (ראה פריסה ל-IBM Quantum Platform להוראות).

import tarfile
from qiskit_serverless import IBMServerlessClient

# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()

# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)

# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'

לאחר מכן, תוכל לרשום את כל הקבצים בספריית data שלך. נתונים אלו נגישים לכל התוכניות.

serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']

ניתן לעשות זאת מתוך תוכנית באמצעות file_download() כדי להוריד את הקובץ לסביבת התוכנית, ולפרוק את ה-tar.

%%writefile ./source_files/extract_tarfile.py

import tarfile
from qiskit_serverless import IBMServerlessClient

serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)

with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()

בשלב זה, התוכנית שלך יכולה לתקשר עם הקבצים, בדיוק כפי שהיית עושה בניסוי מקומי. file_upload(), file_download() ו-file_delete() ניתן לקרוא מהניסוי המקומי שלך, או מהתוכנית המועלית שלך, לניהול נתונים עקבי וגמיש.

צעדים הבאים

המלצות