Skip to content

Commit

Permalink
Run Black and flake8 to correct formatting of .py files
Browse files Browse the repository at this point in the history
  • Loading branch information
sase1988 committed Oct 11, 2023
1 parent 6d6f61a commit d3d1b7c
Show file tree
Hide file tree
Showing 5 changed files with 383 additions and 278 deletions.
270 changes: 168 additions & 102 deletions src/imagery/i.saocom/i.saocom.geocode/i.saocom.geocode.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,19 @@
# % multiple: no
# % description: Basename folder contaning the GCPS info. This folder is generated by i.saocom.import
# %end
#%option
#% key: location
#% type: string
#% multiple: no
#% required: yes
#% description: Location where the geocoded map will be stored
#%end
#%option G_OPT_M_DIR
#% key: dbase
#% multiple: no
#% required: yes
#% description: GRASS GIS database directory to store the external geocoded map
#%end
# %option
# % key: location
# % type: string
# % multiple: no
# % required: yes
# % description: Location where the geocoded map will be stored
# %end
# %option G_OPT_M_DIR
# % key: dbase
# % multiple: no
# % required: yes
# % description: GRASS GIS database directory to store the external geocoded map
# %end


import os
Expand All @@ -103,135 +103,201 @@
from grass.exceptions import ParameterError



def save_GeoTiff(fn, crs, transform, mat, meta=None, nodata=None, bandnames=[]):
if len(mat.shape)==2:
count=1
if len(mat.shape) == 2:
count = 1
else:
count=mat.shape[0]
count = mat.shape[0]

if not meta:
meta = {}

meta['driver'] = 'GTiff'
meta['height'] = mat.shape[-2]
meta['width'] = mat.shape[-1]
meta['count'] = count
meta['crs'] = crs
meta['transform'] = transform
meta["driver"] = "GTiff"
meta["height"] = mat.shape[-2]
meta["width"] = mat.shape[-1]
meta["count"] = count
meta["crs"] = crs
meta["transform"] = transform

if 'dtype' not in meta: #if no datatype is specified, use float32
meta['dtype'] = np.float32

if "dtype" not in meta: # if no datatype is specified, use float32
meta["dtype"] = np.float32

if nodata==None:
if nodata is None:
pass
else:
meta['nodata'] = nodata
meta["nodata"] = nodata

with rasterio.open(fn, 'w', **meta) as dst:
if count==1: # Mono-band raster
dst.write(mat.astype(meta['dtype']), 1)
with rasterio.open(fn, "w", **meta) as dst:
if count == 1: # Mono-band raster
dst.write(mat.astype(meta["dtype"]), 1)
if bandnames:
dst.set_band_description(1, bandnames[0])
else: # Multi-band raster
else: # Multi-band raster
for b in range(count):
dst.write(mat[b].astype(meta['dtype']), b+1)
for b,bandname in enumerate(bandnames):
dst.set_band_description(b+1, bandname)
dst.write(mat[b].astype(meta["dtype"]), b + 1)
for b, bandname in enumerate(bandnames):
dst.set_band_description(b + 1, bandname)


def read_gcps(gcp_base_folder):
df_gcps = pd.read_csv(os.path.join(gcp_base_folder,'GCPS.csv'))
df_gcps = pd.read_csv(os.path.join(gcp_base_folder, "GCPS.csv"))
gcps = []
for i,d in df_gcps.iterrows():
gcp = rasterio.control.GroundControlPoint(d.row, d.col, d.x, d.y, d.z, d.id, d.info)
gcps.append(gcp)
for i, d in df_gcps.iterrows():
gcp = rasterio.control.GroundControlPoint(
d.row, d.col, d.x, d.y, d.z, d.id, d.info
)
gcps.append(gcp)
return gcps

def geocode_file(input_map,basename,outdir,output_map,format = 'GTiff'):
#Write the map to Geotiff file
grass.run_command("r.out.gdal", input = input_map, output = os.path.join(outdir,output_map), format = 'GTiff')
#Read as rasterio dataset
ds = rasterio.open(os.path.join(outdir,output_map))

def geocode_file(input_map, basename, outdir, output_map, format="GTiff"):
# Write the map to Geotiff file
grass.run_command(
"r.out.gdal",
input=input_map,
output=os.path.join(outdir, output_map),
format="GTiff",
)
# Read as rasterio dataset
ds = rasterio.open(os.path.join(outdir, output_map))
ds = ds.read()[0]
#Read the GCPS dataframe
# Read the GCPS dataframe
env = grass.gisenv()
gcp_base_folder = os.path.join(env["GISDBASE"], env["LOCATION_NAME"], env["MAPSET"], "cell_misc",basename)
gcp_base_folder = os.path.join(
env["GISDBASE"], env["LOCATION_NAME"], env["MAPSET"], "cell_misc", basename
)
gcps = read_gcps(gcp_base_folder)
#Save the geocoded raster (it will replace the previous intermediate file)
# Save the geocoded raster (it will replace the previous intermediate file)
geoTs = rasterio.transform.from_gcps(gcps)
crs = rasterio.crs.CRS.from_epsg(4326)
save_GeoTiff(fn = os.path.join(outdir,output_map), crs = crs, transform = geoTs, mat = ds)

def export_to_location(outdir,location,input_map,int_map,env):
#Run gdalWarp. This is made to avoid ERROR: Input map is rotated - cannot import
output_warp = f'gdalwarp_{int_map}'
os.system(f'gdalwarp {os.path.join(outdir,int_map)} {os.path.join(outdir,output_warp)}')

grass.warning(_('Switching location'))

#Create the new location with EPSG:4326, in case it does not exist
save_GeoTiff(fn=os.path.join(outdir, output_map), crs=crs, transform=geoTs, mat=ds)


def export_to_location(outdir, location, input_map, int_map, env):
# Run gdalWarp. This is made to avoid ERROR: Input map is rotated - cannot import
output_warp = f"gdalwarp_{int_map}"
os.system(
f"gdalwarp {os.path.join(outdir,int_map)} {os.path.join(outdir,output_warp)}"
)

grass.warning(_("Switching location"))

# Create the new location with EPSG:4326, in case it does not exist
location_folder = env["GISDBASE"]
out_location = os.path.join(location_folder,location)
out_location = os.path.join(location_folder, location)
if not os.path.exists(out_location):
grass.create_location(env["GISDBASE"], location, epsg=4326, desc='Location created by i.saocom.geocode')

grass.run_command('g.mapset',mapset = 'PERMANENT',location = location)
grass.run_command("r.import", input = os.path.join(outdir,output_warp), output = f'{input_map}')
os.remove(os.path.join(outdir,output_warp))
grass.create_location(
env["GISDBASE"],
location,
epsg=4326,
desc="Location created by i.saocom.geocode",
)

grass.run_command("g.mapset", mapset="PERMANENT", location=location)
grass.run_command(
"r.import", input=os.path.join(outdir, output_warp), output=f"{input_map}"
)
os.remove(os.path.join(outdir, output_warp))



def main():
input_map = options['map']
input_map = options["map"]
data = options["data"]
zip_v = options["is_zip"]
pols = options['pols']
multilook = options['multilook']
pols = options["pols"]
multilook = options["multilook"]
basename = options["basename"]
location = options['location']
outdir = options['dbase']
location = options["location"]
outdir = options["dbase"]
env = grass.gisenv()

if not input_map and not data:
grass.fatal(_("Either one of input map or data folder/zip must be specified"))

if input_map and data:
grass.fatal(_("Either one of input map or data folder/zip must be specified, not both"))

grass.fatal(
_("Either one of input map or data folder/zip must be specified, not both")
)

if not input_map and data:
#Import real and imaginary bands to a temporary location and geocode them to external file
# Import real and imaginary bands to a temporary location and geocode them to external file
grass.message(_("Running i.saocom.import"))
grass.create_location(env["GISDBASE"], f'{basename}_XY_tempLocation',overwrite=1)
grass.run_command('g.mapset',mapset = 'PERMANENT',location = f'{basename}_XY_tempLocation')
grass.run_command('i.saocom.import',data=data,is_zip=zip_v,pols=pols,multilook=multilook,basename=basename)
#Get the list of maps to be geocoded
map_list = grass.list_grouped(type="raster",pattern = f'{basename}*')['PERMANENT']
grass.create_location(
env["GISDBASE"], f"{basename}_XY_tempLocation", overwrite=1
)
grass.run_command(
"g.mapset", mapset="PERMANENT", location=f"{basename}_XY_tempLocation"
)
grass.run_command(
"i.saocom.import",
data=data,
is_zip=zip_v,
pols=pols,
multilook=multilook,
basename=basename,
)
# Get the list of maps to be geocoded
map_list = grass.list_grouped(type="raster", pattern=f"{basename}*")[
"PERMANENT"
]
for m in map_list:
grass.run_command('g.region',raster=m)
geocode_file(input_map=m,basename=basename,outdir=outdir,output_map=f'{m}.tif',format = 'GTiff')
#Import the geocoded bands into the target location
export_to_location(outdir=outdir,location=location,input_map=f'{m}_geo',int_map=f'{m}.tif',env=env)
#Remove intermediate files
os.remove(os.path.join(outdir,f'{m}.tif'))
#Go back to temporary location to continue exporting
grass.run_command('g.mapset',mapset = 'PERMANENT',location = f'{basename}_XY_tempLocation')
#Go back to original location
grass.run_command('g.mapset',mapset = env["MAPSET"],location = env["LOCATION_NAME"])
shutil.rmtree(os.path.join(env["GISDBASE"], f'{basename}_XY_tempLocation'))

grass.run_command("g.region", raster=m)
geocode_file(
input_map=m,
basename=basename,
outdir=outdir,
output_map=f"{m}.tif",
format="GTiff",
)
# Import the geocoded bands into the target location
export_to_location(
outdir=outdir,
location=location,
input_map=f"{m}_geo",
int_map=f"{m}.tif",
env=env,
)
# Remove intermediate files
os.remove(os.path.join(outdir, f"{m}.tif"))
# Go back to temporary location to continue exporting
grass.run_command(
"g.mapset", mapset="PERMANENT", location=f"{basename}_XY_tempLocation"
)
# Go back to original location
grass.run_command(
"g.mapset", mapset=env["MAPSET"], location=env["LOCATION_NAME"]
)
shutil.rmtree(os.path.join(env["GISDBASE"], f"{basename}_XY_tempLocation"))

if input_map and not data:
#Check if this an XY location
proj = grass.read_command('g.proj', flags ='g').split('=')[1].split('\n')[0]
if proj != 'xy_location_unprojected':
raise ValueError('Current location is not XY unprojected (radar coordinates)')
geocode_file(input_map=input_map,basename=basename,outdir=outdir,output_map=f'{input_map}.tif',format = 'GTiff')
export_to_location(outdir=outdir,location=location,input_map=f'{input_map}_geo',int_map=f'{input_map}.tif',env=env)
#Remove intermediate files
os.remove(os.path.join(outdir,f'{input_map}.tif'))
#Go back to original location
grass.run_command('g.mapset',mapset = env["MAPSET"],location = env["LOCATION_NAME"])


# Check if this an XY location
proj = grass.read_command("g.proj", flags="g").split("=")[1].split("\n")[0]
if proj != "xy_location_unprojected":
raise ValueError(
"Current location is not XY unprojected (radar coordinates)"
)
geocode_file(
input_map=input_map,
basename=basename,
outdir=outdir,
output_map=f"{input_map}.tif",
format="GTiff",
)
export_to_location(
outdir=outdir,
location=location,
input_map=f"{input_map}_geo",
int_map=f"{input_map}.tif",
env=env,
)
# Remove intermediate files
os.remove(os.path.join(outdir, f"{input_map}.tif"))
# Go back to original location
grass.run_command(
"g.mapset", mapset=env["MAPSET"], location=env["LOCATION_NAME"]
)


if __name__ == "__main__":
options, flags = grass.parser()
main()
Loading

0 comments on commit d3d1b7c

Please sign in to comment.