diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index ff47db883879..7915bf8b034b 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -222,7 +222,6 @@ def delete_dag_asset_queued_event( ) -@mark_fastapi_migration_done @security.requires_access_asset("GET") @security.requires_access_dag("GET") @provide_session diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index c573996eafd1..337d85547c3d 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -19,7 +19,7 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar, Union, overload +from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar from fastapi import Depends, HTTPException, Query from pendulum.parsing.exceptions import ParserError @@ -409,27 +409,6 @@ def _safe_parse_datetime(date_to_check: str) -> datetime: """ if not date_to_check: raise ValueError(f"{date_to_check} cannot be None.") - return _safe_parse_datetime_optional(date_to_check) - - -@overload -def _safe_parse_datetime_optional(date_to_check: str) -> datetime: ... - - -@overload -def _safe_parse_datetime_optional(date_to_check: None) -> None: ... - - -def _safe_parse_datetime_optional(date_to_check: str | None) -> datetime | None: - """ - Parse datetime and raise error for invalid dates. - - Allow None values. - - :param date_to_check: the string value to be parsed - """ - if date_to_check is None: - return None try: return timezone.parse(date_to_check, strict=True) except (TypeError, ParserError): @@ -635,8 +614,7 @@ def depends_float( # Common Safe DateTime -DateTimeQuery = Annotated[datetime, AfterValidator(_safe_parse_datetime)] -OptionalDateTimeQuery = Annotated[Union[datetime, None], AfterValidator(_safe_parse_datetime_optional)] +DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)] # DAG QueryLimit = Annotated[LimitFilter, Depends(LimitFilter().depends)] diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index bfdbb2d7fc88..e5ac10715ed4 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -101,21 +101,6 @@ class AssetEventCollectionResponse(BaseModel): total_entries: int -class QueuedEventResponse(BaseModel): - """Queued Event serializer for responses..""" - - uri: str - dag_id: str - created_at: datetime - - -class QueuedEventCollectionResponse(BaseModel): - """Queued Event Collection serializer for responses.""" - - queued_events: list[QueuedEventResponse] - total_entries: int - - class CreateAssetEventsBody(BaseModel): """Create asset events request.""" diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index bdf1b8aef1bd..e7762392a0c8 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -47,14 +47,12 @@ paths: required: true schema: type: string - format: date-time title: Start Date - name: end_date in: query required: true schema: type: string - format: date-time title: End Date responses: '200': @@ -172,7 +170,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets: + /public/assets/: get: tags: - Asset @@ -348,7 +346,6 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/events: post: tags: - Asset @@ -356,11 +353,11 @@ paths: description: Create asset events. operationId: create_asset_event requestBody: + required: true content: application/json: schema: $ref: '#/components/schemas/CreateAssetEventsBody' - required: true responses: '200': description: Successful Response @@ -369,23 +366,23 @@ paths: schema: $ref: '#/components/schemas/AssetEventResponse' '401': - description: Unauthorized content: application/json: schema: $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized '403': - description: Forbidden content: application/json: schema: $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden '404': - description: Not Found content: application/json: schema: $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found '422': description: Validation Error content: @@ -437,60 +434,6 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/assets/queuedEvent: - get: - tags: - - Asset - summary: Get Dag Asset Queued Events - description: Get queued asset events for a DAG. - operationId: get_dag_asset_queued_events - parameters: - - name: dag_id - in: path - required: true - schema: - type: string - title: Dag Id - - name: before - in: query - required: false - schema: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Before - responses: - '200': - description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/QueuedEventCollectionResponse' - '401': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Unauthorized - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Forbidden - '404': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Not Found - '422': - description: Validation Error - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' /public/backfills/: get: tags: @@ -5787,41 +5730,6 @@ components: - version title: ProviderResponse description: Provider serializer for responses. - QueuedEventCollectionResponse: - properties: - queued_events: - items: - $ref: '#/components/schemas/QueuedEventResponse' - type: array - title: Queued Events - total_entries: - type: integer - title: Total Entries - type: object - required: - - queued_events - - total_entries - title: QueuedEventCollectionResponse - description: Queued Event Collection serializer for responses. - QueuedEventResponse: - properties: - uri: - type: string - title: Uri - dag_id: - type: string - title: Dag Id - created_at: - type: string - format: date-time - title: Created At - type: object - required: - - uri - - dag_id - - created_at - title: QueuedEventResponse - description: Queued Event serializer for responses.. ReprocessBehavior: type: string enum: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 0900b0400987..326a387f0089 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -17,7 +17,6 @@ from __future__ import annotations -from datetime import datetime from typing import Annotated from fastapi import Depends, HTTPException, status @@ -26,7 +25,6 @@ from airflow.api_fastapi.common.db.common import get_session, paginated_select from airflow.api_fastapi.common.parameters import ( - OptionalDateTimeQuery, QueryAssetDagIdPatternSearch, QueryAssetIdFilter, QueryLimit, @@ -45,41 +43,18 @@ AssetEventResponse, AssetResponse, CreateAssetEventsBody, - QueuedEventCollectionResponse, - QueuedEventResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.assets import Asset from airflow.assets.manager import asset_manager -from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel +from airflow.models.asset import AssetEvent, AssetModel from airflow.utils import timezone -assets_router = AirflowRouter(tags=["Asset"]) - - -def _generate_queued_event_where_clause( - *, - dag_id: str | None = None, - uri: str | None = None, - before: datetime | None = None, -) -> list: - """Get AssetDagRunQueue where clause.""" - where_clause = [] - if dag_id is not None: - where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) - if uri is not None: - where_clause.append( - AssetDagRunQueue.asset_id.in_( - select(AssetModel.id).where(AssetModel.uri == uri), - ), - ) - if before is not None: - where_clause.append(AssetDagRunQueue.created_at < before) - return where_clause +assets_router = AirflowRouter(tags=["Asset"], prefix="/assets") @assets_router.get( - "/assets", + "/", responses=create_openapi_http_exception_doc([401, 403, 404]), ) def get_assets( @@ -114,7 +89,7 @@ def get_assets( @assets_router.get( - "/assets/events", + "/events", responses=create_openapi_http_exception_doc([404]), ) def get_asset_events( @@ -190,7 +165,7 @@ def create_asset_event( @assets_router.get( - "/assets/{uri:path}", + "/{uri:path}", responses=create_openapi_http_exception_doc([401, 403, 404]), ) def get_asset( @@ -208,47 +183,3 @@ def get_asset( raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found") return AssetResponse.model_validate(asset, from_attributes=True) - - -@assets_router.get( - "/dags/{dag_id}/assets/queuedEvent", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), -) -def get_dag_asset_queued_events( - dag_id: str, - session: Annotated[Session, Depends(get_session)], - before: OptionalDateTimeQuery = None, -) -> QueuedEventCollectionResponse: - """Get queued asset events for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) - - dag_asset_queued_events_select, total_entries = paginated_select( - query, - [], - ) - adrqs = session.execute(dag_asset_queued_events_select).all() - - if not adrqs: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") - - queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs - ] - - return QueuedEventCollectionResponse( - queued_events=[ - QueuedEventResponse.model_validate(queued_event, from_attributes=True) - for queued_event in queued_events - ], - total_entries=total_entries, - ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 7b23e33f0ab4..46940bfd3189 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -129,28 +129,6 @@ export const UseAssetServiceGetAssetKeyFn = ( }, queryKey?: Array, ) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])]; -export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< - ReturnType ->; -export type AssetServiceGetDagAssetQueuedEventsQueryResult< - TData = AssetServiceGetDagAssetQueuedEventsDefaultResponse, - TError = unknown, -> = UseQueryResult; -export const useAssetServiceGetDagAssetQueuedEventsKey = - "AssetServiceGetDagAssetQueuedEvents"; -export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = ( - { - before, - dagId, - }: { - before?: string; - dagId: string; - }, - queryKey?: Array, -) => [ - useAssetServiceGetDagAssetQueuedEventsKey, - ...(queryKey ?? [{ before, dagId }]), -]; export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 0c522f36e433..4c541670258f 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -165,32 +165,6 @@ export const prefetchUseAssetServiceGetAsset = ( queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }), queryFn: () => AssetService.getAsset({ uri }), }); -/** - * Get Dag Asset Queued Events - * Get queued asset events for a DAG. - * @param data The data for the request. - * @param data.dagId - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ -export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( - queryClient: QueryClient, - { - before, - dagId, - }: { - before?: string; - dagId: string; - }, -) => - queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn({ - before, - dagId, - }), - queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }), - }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 8ec0ea9234ac..a96b09e12a79 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -213,39 +213,6 @@ export const useAssetServiceGetAsset = < queryFn: () => AssetService.getAsset({ uri }) as TData, ...options, }); -/** - * Get Dag Asset Queued Events - * Get queued asset events for a DAG. - * @param data The data for the request. - * @param data.dagId - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ -export const useAssetServiceGetDagAssetQueuedEvents = < - TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse, - TError = unknown, - TQueryKey extends Array = unknown[], ->( - { - before, - dagId, - }: { - before?: string; - dagId: string; - }, - queryKey?: TQueryKey, - options?: Omit, "queryKey" | "queryFn">, -) => - useQuery({ - queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn( - { before, dagId }, - queryKey, - ), - queryFn: () => - AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, - ...options, - }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 1b8142228153..43331b187fe0 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -195,39 +195,6 @@ export const useAssetServiceGetAssetSuspense = < queryFn: () => AssetService.getAsset({ uri }) as TData, ...options, }); -/** - * Get Dag Asset Queued Events - * Get queued asset events for a DAG. - * @param data The data for the request. - * @param data.dagId - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ -export const useAssetServiceGetDagAssetQueuedEventsSuspense = < - TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse, - TError = unknown, - TQueryKey extends Array = unknown[], ->( - { - before, - dagId, - }: { - before?: string; - dagId: string; - }, - queryKey?: TQueryKey, - options?: Omit, "queryKey" | "queryFn">, -) => - useSuspenseQuery({ - queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn( - { before, dagId }, - queryKey, - ), - queryFn: () => - AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, - ...options, - }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 1f83e434286b..e5ac0441a2aa 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2880,48 +2880,6 @@ export const $ProviderResponse = { description: "Provider serializer for responses.", } as const; -export const $QueuedEventCollectionResponse = { - properties: { - queued_events: { - items: { - $ref: "#/components/schemas/QueuedEventResponse", - }, - type: "array", - title: "Queued Events", - }, - total_entries: { - type: "integer", - title: "Total Entries", - }, - }, - type: "object", - required: ["queued_events", "total_entries"], - title: "QueuedEventCollectionResponse", - description: "Queued Event Collection serializer for responses.", -} as const; - -export const $QueuedEventResponse = { - properties: { - uri: { - type: "string", - title: "Uri", - }, - dag_id: { - type: "string", - title: "Dag Id", - }, - created_at: { - type: "string", - format: "date-time", - title: "Created At", - }, - }, - type: "object", - required: ["uri", "dag_id", "created_at"], - title: "QueuedEventResponse", - description: "Queued Event serializer for responses..", -} as const; - export const $ReprocessBehavior = { type: "string", enum: ["failed", "completed", "none"], diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index c39dce38d34d..53bb3527d142 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -13,8 +13,6 @@ import type { CreateAssetEventResponse, GetAssetData, GetAssetResponse, - GetDagAssetQueuedEventsData, - GetDagAssetQueuedEventsResponse, HistoricalMetricsData, HistoricalMetricsResponse, RecentDagRunsData, @@ -169,7 +167,7 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/assets", + url: "/public/assets/", query: { limit: data.limit, offset: data.offset, @@ -239,7 +237,7 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "POST", - url: "/public/events", + url: "/public/assets/events", body: data.requestBody, mediaType: "application/json", errors: { @@ -276,36 +274,6 @@ export class AssetService { }, }); } - - /** - * Get Dag Asset Queued Events - * Get queued asset events for a DAG. - * @param data The data for the request. - * @param data.dagId - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ - public static getDagAssetQueuedEvents( - data: GetDagAssetQueuedEventsData, - ): CancelablePromise { - return __request(OpenAPI, { - method: "GET", - url: "/public/dags/{dag_id}/assets/queuedEvent", - path: { - dag_id: data.dagId, - }, - query: { - before: data.before, - }, - errors: { - 401: "Unauthorized", - 403: "Forbidden", - 404: "Not Found", - 422: "Validation Error", - }, - }); - } } export class DashboardService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 96d6b812897f..078699cc0f2b 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -712,23 +712,6 @@ export type ProviderResponse = { version: string; }; -/** - * Queued Event Collection serializer for responses. - */ -export type QueuedEventCollectionResponse = { - queued_events: Array; - total_entries: number; -}; - -/** - * Queued Event serializer for responses.. - */ -export type QueuedEventResponse = { - uri: string; - dag_id: string; - created_at: string; -}; - /** * Internal enum for setting reprocess behavior in a backfill. * @@ -1062,13 +1045,6 @@ export type GetAssetData = { export type GetAssetResponse = AssetResponse; -export type GetDagAssetQueuedEventsData = { - before?: string | null; - dagId: string; -}; - -export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse; - export type HistoricalMetricsData = { endDate: string; startDate: string; @@ -1559,7 +1535,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets": { + "/public/assets/": { get: { req: GetAssetsData; res: { @@ -1612,8 +1588,6 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; - }; - "/public/events": { post: { req: CreateAssetEventData; res: { @@ -1667,33 +1641,6 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/assets/queuedEvent": { - get: { - req: GetDagAssetQueuedEventsData; - res: { - /** - * Successful Response - */ - 200: QueuedEventCollectionResponse; - /** - * Unauthorized - */ - 401: HTTPExceptionResponse; - /** - * Forbidden - */ - 403: HTTPExceptionResponse; - /** - * Not Found - */ - 404: HTTPExceptionResponse; - /** - * Validation Error - */ - 422: HTTPValidationError; - }; - }; - }; "/ui/dashboard/historical_metrics_data": { get: { req: HistoricalMetricsData; diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index b5744e47edf8..42b7acd908ff 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -24,13 +24,7 @@ import time_machine from airflow.models import DagModel -from airflow.models.asset import ( - AssetDagRunQueue, - AssetEvent, - AssetModel, - DagScheduleAssetReference, - TaskOutletAssetReference, -) +from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference from airflow.models.dagrun import DagRun from airflow.utils import timezone from airflow.utils.session import provide_session @@ -470,7 +464,7 @@ def test_should_respond_404(self, test_client): assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" -class TestQueuedEventEndpoint(TestAssets): +class TestPostAssetEvents(TestAssets): @pytest.fixture def time_freezer(self) -> Generator: freezer = time_machine.travel(self.default_time, tick=False) @@ -480,50 +474,6 @@ def time_freezer(self) -> Generator: freezer.stop() - def _create_asset_dag_run_queues(self, dag_id, asset_id, session): - adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id) - session.add(adrq) - session.commit() - return adrq - - -class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint): - @pytest.mark.usefixtures("time_freezer") - def test_should_respond_200(self, test_client, session, create_dummy_dag): - dag, _ = create_dummy_dag() - dag_id = dag.dag_id - self.create_assets(session=session, num=1) - asset_id = 1 - self._create_asset_dag_run_queues(dag_id, asset_id, session) - - response = test_client.get( - f"/public/dags/{dag_id}/assets/queuedEvent", - ) - - assert response.status_code == 200 - assert response.json() == { - "queued_events": [ - { - "created_at": self.default_time.replace("+00:00", "Z"), - "uri": "s3://bucket/key/1", - "dag_id": "dag", - } - ], - "total_entries": 1, - } - - def test_should_respond_404(self, test_client): - dag_id = "not_exists" - - response = test_client.get( - f"/public/dags/{dag_id}/assets/queuedEvent", - ) - - assert response.status_code == 404 - assert response.json()["detail"] == "Queue event with dag_id: `not_exists` was not found" - - -class TestPostAssetEvents(TestAssets): @pytest.mark.usefixtures("time_freezer") def test_should_respond_200(self, test_client, session): self.create_assets()