Skip to content

Commit

Permalink
Fixup ASTER update geometries (#132)
Browse files Browse the repository at this point in the history
* feat: fixup aster update geometries
  • Loading branch information
gadomski authored Jan 24, 2023
1 parent 19ba090 commit 0b68462
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 17 deletions.
41 changes: 27 additions & 14 deletions datasets/aster/aster.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,29 +92,34 @@ def run(self, input: CreateChunksInput, context: TaskContext) -> CreateChunksOut
item_collection = read_item_collection(
asset["href"], storage_options=asset["table:storage_options"]
)
chunks = []
chunk = []
for item in itertools.islice(item_collection, input.limit):
chunk.append(fix_dict(item.to_dict(include_self_link=False)))
if len(chunk) >= input.chunk_size:
chunks.append(chunk)
chunk = []
if chunk:
chunks.append(chunk)
output = []
for i, chunk in enumerate(chunks):
uri = f"{input.dst_uri}/{asset['partition_number']}/{i}.ndjson"

def write_chunk(chunk: List[Item], chunk_number: int) -> None:
uri = f"{input.dst_uri}/{asset['partition_number']}/{chunk_number}.ndjson"
storage, path = context.storage_factory.get_storage_for_file(uri)
storage.write_text(
path, "\n".join(orjson.dumps(item).decode("utf-8") for item in chunk)
)
output.append(
Chunk(
uri=storage.get_uri(path),
id=str(i),
id=str(chunk_number),
partition_number=str(asset["partition_number"]),
)
)
return chunk_number + 1

chunk = []
chunk_number = 0
for item in itertools.islice(item_collection, input.limit):
chunk.append(
fix_dict(item.to_dict(include_self_link=False, transform_hrefs=False))
)
if len(chunk) >= input.chunk_size:
chunk_number = write_chunk(chunk, chunk_number)
chunk = []
if chunk:
write_chunk(chunk, chunk_number)
return CreateChunksOutput(chunks=output)


Expand Down Expand Up @@ -147,9 +152,17 @@ def run(self, input: UpdateItemsInput, context: TaskContext) -> UpdateItemsOutpu
new_item = sign_and_update(item, input.simplify_tolerance)
except Exception as e:
logger.error(e)
error_items.append(fix_dict(item.to_dict(include_self_link=False)))
error_items.append(
fix_dict(
item.to_dict(include_self_link=False, transform_hrefs=False)
)
)
else:
items.append(fix_dict(new_item.to_dict(include_self_link=False)))
items.append(
fix_dict(
new_item.to_dict(include_self_link=False, transform_hrefs=False)
)
)
logger.info(f"{len(items)} items updated, {len(error_items)} errors")
storage, path = context.storage_factory.get_storage_for_file(
f"{input.item_chunkset_uri}/{input.partition_number}/{input.chunk_id}.ndjson"
Expand Down
2 changes: 1 addition & 1 deletion datasets/aster/dataset.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# TODO actually implement this -- this is currently a placeholder just to upload the collection
id: aster
image: ${{ args.registry }}/pctasks-task-base:latest
image: ${{ args.registry }}/pctasks-aster:latest

args:
- registry
Expand Down
28 changes: 28 additions & 0 deletions datasets/aster/update-geometries-ingest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Ingest NDJsons from blob://astersa/aster-etl-data/items/update-geometries
jobs:
ingest-items:
id: ingest-items
tasks:
- id: ingest-ndjson
image_key: ingest
task: pctasks.ingest_task.task:ingest_task
args:
content:
type: Ndjson
ndjson_folder:
uri: blob://astersa/aster-etl-data/items/update-geometries
extensions:
- .ndjson
matches: \d+.ndjson
options:
insert_group_size: 5000
insert_only: false
environment:
AZURE_TENANT_ID: ${{ secrets.task-tenant-id }}
AZURE_CLIENT_ID: ${{ secrets.task-client-id }}
AZURE_CLIENT_SECRET: ${{ secrets.task-client-secret }}
schema_version: 1.0.0
schema_version: 1.0.0
id: aster-update-geometries-ingest
dataset: microsoft/aster-l1t

6 changes: 4 additions & 2 deletions datasets/aster/update-geometries.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ jobs:
task: aster:create_chunks_task
args:
asset: ${{ item }}
dst_uri: blob://astersa/aster-etl-data/chunks/update-geometries
dst_uri: blob://astersa/aster-etl-data/chunks/update-geometries-2023-01-23-00
chunk_size: 5000
environment:
AZURE_TENANT_ID: ${{ secrets.task-tenant-id }}
AZURE_CLIENT_ID: ${{ secrets.task-client-id }}
AZURE_CLIENT_SECRET: ${{ secrets.task-client-secret }}
tags:
batch_pool_id: high_memory_pool
update-items:
foreach:
items: ${{ jobs.create-chunks.tasks.create-chunks.output.chunks }}
Expand All @@ -54,7 +56,7 @@ jobs:
partition_number: ${{ item.partition_number }}
chunk_uri: ${{ item.uri }}
chunk_id: ${{ item.id }}
item_chunkset_uri: blob://astersa/aster-etl-data/items/update-geometries
item_chunkset_uri: blob://astersa/aster-etl-data/items/update-geometries-2023-01-23-00
simplify_tolerance: 0.001
environment:
AZURE_TENANT_ID: ${{ secrets.task-tenant-id }}
Expand Down

0 comments on commit 0b68462

Please sign in to comment.