import json
from django.contrib.gis.db.models.fields import BaseSpatialField
from django.contrib.gis.db.models.functions import Distance
from django.contrib.gis.db.models.lookups import DistanceLookupBase, GISLookup
from django.contrib.gis.gdal import GDALRaster
from django.contrib.gis.geos import GEOSGeometry
from django.contrib.gis.measure import D
from django.contrib.gis.shortcuts import numpy
from django.db import connection
from django.db.models import F, Func, Q
from django.test import TransactionTestCase, skipUnlessDBFeature
from django.test.utils import CaptureQueriesContext
from ..data.rasters.textrasters import JSON_RASTER
from .models import RasterModel, RasterRelatedModel
@skipUnlessDBFeature("supports_raster")
class RasterFieldTest(TransactionTestCase):
available_apps = ["gis_tests.rasterapp"]
def setUp(self):
rast = GDALRaster(
{"srid": 4326,
"origin": [0, 0],
"scale": [-1, 1],
"skew": [0, 0],
"width": 5,
"height": 5,
"nr_of_bands": 2,
"bands": [{"data": range(25)}, {"data": range(25, 50)}],
})model_instance = RasterModel.objects.create(
rast=rast,
rastprojected=rast,
geom="POINT (-95.37040 29.70486)",
)RasterRelatedModel.objects.create(rastermodel=model_instance)
def test_field_null_value(self):
"""
Test creating a model where the RasterField has a null value."""r = RasterModel.objects.create(rast=None)
r.refresh_from_db()
self.assertIsNone(r.rast)
def test_access_band_data_directly_from_queryset(self):
RasterModel.objects.create(rast=JSON_RASTER)
qs = RasterModel.objects.all()
qs[0].rast.bands[0].data()
def test_deserialize_with_pixeltype_flags(self):
no_data = 3
rast = GDALRaster(
{"srid": 4326,
"origin": [0, 0],
"scale": [-1, 1],
"skew": [0, 0],
"width": 1,
"height": 1,
"nr_of_bands": 1,
"bands": [{"data": [no_data], "nodata_value": no_data}],
})r = RasterModel.objects.create(rast=rast)
RasterModel.objects.filter(pk=r.pk).update(
rast=Func(F("rast"), function="ST_SetBandIsNoData"),
)r.refresh_from_db()
band = r.rast.bands[0].data()
if numpy:
band = band.flatten().tolist()
self.assertEqual(band, [no_data])
self.assertEqual(r.rast.bands[0].nodata_value, no_data)
def test_model_creation(self):
"""
Test RasterField through a test model."""# Create model instance from JSON raster
r = RasterModel.objects.create(rast=JSON_RASTER)
r.refresh_from_db()
# Test raster metadata properties
self.assertEqual((5, 5), (r.rast.width, r.rast.height))
self.assertEqual([0.0, -1.0, 0.0, 0.0, 0.0, 1.0], r.rast.geotransform)
self.assertIsNone(r.rast.bands[0].nodata_value)
# Compare srs
self.assertEqual(r.rast.srs.srid, 4326)
# Compare pixel values
band = r.rast.bands[0].data()
# If numpy, convert result to list
if numpy:
band = band.flatten().tolist()
# Loop through rows in band data and assert single
# value is as expected.
self.assertEqual(
[0.0,
1.0,
2.0,
3.0,
4.0,
5.0,
6.0,
7.0,
8.0,
9.0,
10.0,
11.0,
12.0,
13.0,
14.0,
15.0,
16.0,
17.0,
18.0,
19.0,
20.0,
21.0,
22.0,
23.0,
24.0,
],band,)def test_implicit_raster_transformation(self):
"""
Test automatic transformation of rasters with srid different from thefield srid."""# Parse json raster
rast = json.loads(JSON_RASTER)
# Update srid to another value
rast["srid"] = 3086
# Save model and get it from db
r = RasterModel.objects.create(rast=rast)
r.refresh_from_db()
# Confirm raster has been transformed to the default srid
self.assertEqual(r.rast.srs.srid, 4326)
# Confirm geotransform is in lat/lon
expected = [-87.9298551266551,
9.459646421449934e-06,
0.0,
23.94249275457565,
0.0,
-9.459646421449934e-06,
]for val, exp in zip(r.rast.geotransform, expected):
self.assertAlmostEqual(exp, val)
def test_verbose_name_arg(self):
"""
RasterField should accept a positional verbose name argument."""self.assertEqual(
RasterModel._meta.get_field("rast").verbose_name, "A Verbose Raster Name"
)def test_all_gis_lookups_with_rasters(self):
"""
Evaluate all possible lookups for all input combinations (i.e.raster-raster, raster-geom, geom-raster) and for projected andunprojected coordinate systems. This test just checks that the lookupcan be called, but doesn't check if the result makes logical sense."""from django.contrib.gis.db.backends.postgis.operations import PostGISOperations
# Create test raster and geom.
rast = GDALRaster(json.loads(JSON_RASTER))
stx_pnt = GEOSGeometry("POINT (-95.370401017314293 29.704867409475465)", 4326)
stx_pnt.transform(3086)
lookups = [(name, lookup)for name, lookup in BaseSpatialField.get_lookups().items()
if issubclass(lookup, GISLookup)
]self.assertNotEqual(lookups, [], "No lookups found")
# Loop through all the GIS lookups.
for name, lookup in lookups:
# Construct lookup filter strings.
combo_keys = [field + namefor field in [
"rast__",
"rast__",
"rastprojected__0__",
"rast__",
"rastprojected__",
"geom__",
"rast__",
]]if issubclass(lookup, DistanceLookupBase):
# Set lookup values for distance lookups.
combo_values = [(rast, 50, "spheroid"),
(rast, 0, 50, "spheroid"),
(rast, 0, D(km=1)),
(stx_pnt, 0, 500),
(stx_pnt, D(km=1000)),
(rast, 500),
(json.loads(JSON_RASTER), 500),
]elif name == "relate":
# Set lookup values for the relate lookup.
combo_values = [(rast, "T*T***FF*"),
(rast, 0, "T*T***FF*"),
(rast, 0, "T*T***FF*"),
(stx_pnt, 0, "T*T***FF*"),
(stx_pnt, "T*T***FF*"),
(rast, "T*T***FF*"),
(json.loads(JSON_RASTER), "T*T***FF*"),
]elif name == "isvalid":
# The isvalid lookup doesn't make sense for rasters.
continue
elif PostGISOperations.gis_operators[name].func:
# Set lookup values for all function based operators.
combo_values = [rast,(rast, 0),
(rast, 0),
(stx_pnt, 0),
stx_pnt,rast,json.loads(JSON_RASTER),
]else:
# Override band lookup for these, as it's not supported.
combo_keys[2] = "rastprojected__" + name
# Set lookup values for all other operators.
combo_values = [rast,None,
rast,stx_pnt,stx_pnt,rast,json.loads(JSON_RASTER),
]# Create query filter combinations.
self.assertEqual(
len(combo_keys),
len(combo_values),
"Number of lookup names and values should be the same",
)combos = [x for x in zip(combo_keys, combo_values) if x[1]]
self.assertEqual(
[(n, x) for n, x in enumerate(combos) if x in combos[:n]],
[],"There are repeated test lookups",
)combos = [{k: v} for k, v in combos]
for combo in combos:
# Apply this query filter.
qs = RasterModel.objects.filter(**combo)
# Evaluate normal filter qs.
self.assertIn(qs.count(), [0, 1])
# Evaluate on conditional Q expressions.
qs = RasterModel.objects.filter(Q(**combos[0]) & Q(**combos[1]))
self.assertIn(qs.count(), [0, 1])
def test_dwithin_gis_lookup_output_with_rasters(self):
"""
Check the logical functionality of the dwithin lookup for differentinput parameters."""# Create test raster and geom.
rast = GDALRaster(json.loads(JSON_RASTER))
stx_pnt = GEOSGeometry("POINT (-95.370401017314293 29.704867409475465)", 4326)
stx_pnt.transform(3086)
# Filter raster with different lookup raster formats.
qs = RasterModel.objects.filter(rastprojected__dwithin=(rast, D(km=1)))
self.assertEqual(qs.count(), 1)
qs = RasterModel.objects.filter(
rastprojected__dwithin=(json.loads(JSON_RASTER), D(km=1))
)self.assertEqual(qs.count(), 1)
qs = RasterModel.objects.filter(rastprojected__dwithin=(JSON_RASTER, D(km=1)))
self.assertEqual(qs.count(), 1)
# Filter in an unprojected coordinate system.
qs = RasterModel.objects.filter(rast__dwithin=(rast, 40))
self.assertEqual(qs.count(), 1)
# Filter with band index transform.
qs = RasterModel.objects.filter(rast__1__dwithin=(rast, 1, 40))
self.assertEqual(qs.count(), 1)
qs = RasterModel.objects.filter(rast__1__dwithin=(rast, 40))
self.assertEqual(qs.count(), 1)
qs = RasterModel.objects.filter(rast__dwithin=(rast, 1, 40))
self.assertEqual(qs.count(), 1)
# Filter raster by geom.
qs = RasterModel.objects.filter(rast__dwithin=(stx_pnt, 500))
self.assertEqual(qs.count(), 1)
qs = RasterModel.objects.filter(rastprojected__dwithin=(stx_pnt, D(km=10000)))
self.assertEqual(qs.count(), 1)
qs = RasterModel.objects.filter(rast__dwithin=(stx_pnt, 5))
self.assertEqual(qs.count(), 0)
qs = RasterModel.objects.filter(rastprojected__dwithin=(stx_pnt, D(km=100)))
self.assertEqual(qs.count(), 0)
# Filter geom by raster.
qs = RasterModel.objects.filter(geom__dwithin=(rast, 500))
self.assertEqual(qs.count(), 1)
# Filter through related model.
qs = RasterRelatedModel.objects.filter(rastermodel__rast__dwithin=(rast, 40))
self.assertEqual(qs.count(), 1)
# Filter through related model with band index transform
qs = RasterRelatedModel.objects.filter(rastermodel__rast__1__dwithin=(rast, 40))
self.assertEqual(qs.count(), 1)
# Filter through conditional statements.
qs = RasterModel.objects.filter(
Q(rast__dwithin=(rast, 40))
& Q(rastprojected__dwithin=(stx_pnt, D(km=10000)))
)self.assertEqual(qs.count(), 1)
# Filter through different lookup.
qs = RasterModel.objects.filter(rastprojected__bbcontains=rast)
self.assertEqual(qs.count(), 1)
def test_lookup_input_tuple_too_long(self):
rast = GDALRaster(json.loads(JSON_RASTER))
msg = "Tuple too long for lookup bbcontains."
with self.assertRaisesMessage(ValueError, msg):
RasterModel.objects.filter(rast__bbcontains=(rast, 1, 2))
def test_lookup_input_band_not_allowed(self):
rast = GDALRaster(json.loads(JSON_RASTER))
qs = RasterModel.objects.filter(rast__bbcontains=(rast, 1))
msg = "Band indices are not allowed for this operator, it works on bbox only."
with self.assertRaisesMessage(ValueError, msg):
qs.count()
def test_isvalid_lookup_with_raster_error(self):
qs = RasterModel.objects.filter(rast__isvalid=True)
msg = ("IsValid function requires a GeometryField in position 1, got RasterField."
)with self.assertRaisesMessage(TypeError, msg):
qs.count()
def test_result_of_gis_lookup_with_rasters(self):
# Point is in the interior
qs = RasterModel.objects.filter(
rast__contains=GEOSGeometry("POINT (-0.5 0.5)", 4326)
)self.assertEqual(qs.count(), 1)
# Point is in the exterior
qs = RasterModel.objects.filter(
rast__contains=GEOSGeometry("POINT (0.5 0.5)", 4326)
)self.assertEqual(qs.count(), 0)
# A point on the boundary is not contained properly
qs = RasterModel.objects.filter(
rast__contains_properly=GEOSGeometry("POINT (0 0)", 4326)
)self.assertEqual(qs.count(), 0)
# Raster is located left of the point
qs = RasterModel.objects.filter(rast__left=GEOSGeometry("POINT (1 0)", 4326))
self.assertEqual(qs.count(), 1)
def test_lookup_with_raster_bbox(self):
rast = GDALRaster(json.loads(JSON_RASTER))
# Shift raster upward
rast.origin.y = 2
# The raster in the model is not strictly below
qs = RasterModel.objects.filter(rast__strictly_below=rast)
self.assertEqual(qs.count(), 0)
# Shift raster further upward
rast.origin.y = 6
# The raster in the model is strictly below
qs = RasterModel.objects.filter(rast__strictly_below=rast)
self.assertEqual(qs.count(), 1)
def test_lookup_with_polygonized_raster(self):
rast = GDALRaster(json.loads(JSON_RASTER))
# Move raster to overlap with the model point on the left side
rast.origin.x = -95.37040 + 1
rast.origin.y = 29.70486
# Raster overlaps with point in model
qs = RasterModel.objects.filter(geom__intersects=rast)
self.assertEqual(qs.count(), 1)
# Change left side of raster to be nodata values
rast.bands[0].data(data=[0, 0, 0, 1, 1], shape=(5, 1))
rast.bands[0].nodata_value = 0
qs = RasterModel.objects.filter(geom__intersects=rast)
# Raster does not overlap anymore after polygonization
# where the nodata zone is not included.
self.assertEqual(qs.count(), 0)
def test_lookup_value_error(self):
# Test with invalid dict lookup parameter
obj = {}msg = "Couldn't create spatial object from lookup value '%s'." % obj
with self.assertRaisesMessage(ValueError, msg):
RasterModel.objects.filter(geom__intersects=obj)
# Test with invalid string lookup parameter
obj = "00000"
msg = "Couldn't create spatial object from lookup value '%s'." % obj
with self.assertRaisesMessage(ValueError, msg):
RasterModel.objects.filter(geom__intersects=obj)
def test_db_function_errors(self):
"""
Errors are raised when using DB functions with raster content."""point = GEOSGeometry("SRID=3086;POINT (-697024.9213808845 683729.1705516104)")
rast = GDALRaster(json.loads(JSON_RASTER))
msg = "Distance function requires a geometric argument in position 2."
with self.assertRaisesMessage(TypeError, msg):
RasterModel.objects.annotate(distance_from_point=Distance("geom", rast))
with self.assertRaisesMessage(TypeError, msg):
RasterModel.objects.annotate(
distance_from_point=Distance("rastprojected", rast)
)msg = ("Distance function requires a GeometryField in position 1, got RasterField."
)with self.assertRaisesMessage(TypeError, msg):
RasterModel.objects.annotate(
distance_from_point=Distance("rastprojected", point)
).count()
def test_lhs_with_index_rhs_without_index(self):
with CaptureQueriesContext(connection) as queries:
RasterModel.objects.filter(
rast__0__contains=json.loads(JSON_RASTER)
).exists()
# It's easier to check the indexes in the generated SQL than to write
# tests that cover all index combinations.
self.assertRegex(queries[-1]["sql"], r"WHERE ST_Contains\([^)]*, 1, [^)]*, 1\)")